Подтвердить что ты не робот

Компьютерная карта: вычисление стоимости загодя

У меня есть расчетная картамягкие значения), который я использую для кэширования результатов дорогостоящего вычисления.

Теперь у меня есть ситуация, когда я знаю, что конкретный ключ, скорее всего, будет рассмотрен в течение следующих нескольких секунд. Этот ключ также более дорог для вычисления, чем большинство.

Я хотел бы заранее вычислить значение в потоке с минимальным приоритетом, чтобы при запросе значения он уже был кэширован, улучшив время отклика.

Что такое хороший способ сделать это, чтобы:

  • У меня есть контроль над потоком (в частности, его приоритетом), в котором выполняется вычисление.
  • Дублировать работу можно избежать, т.е. вычисление выполняется только один раз. Если задача вычисления уже запущена, то вызывающий поток ожидает эту задачу вместо вычисления значения снова (FutureTask реализует это. С помощью вычислений Guava это верно, если вы вызываете только get, но нет, если вы смешиваете его с вызовами до put.)
  • Метод "вычислять значение заранее" является асинхронным и идемпотентным. Если вычисление уже выполняется, оно должно немедленно вернуться, не дожидаясь завершения этого вычисления.
  • Избегать инверсии приоритета, например. если поток с высоким приоритетом запрашивает значение, а поток среднего приоритета выполняет что-то несвязанное, но задача вычисления ставится в очередь на поток с низким приоритетом, поток с высоким приоритетом не должен быть голоден. Возможно, это может быть достигнуто путем временного повышения приоритета вычислительного потока (ов) и/или выполнения вычисления в вызывающем потоке.

Как это можно скоординировать между всеми задействованными нитями?


Дополнительная информация
Вычисления в моем приложении - операции фильтрации изображений, что означает, что все они связаны с процессором. Эти операции включают в себя аффинные преобразования (от 50 мкс до 1 мс) и свертки (до 10 мс). Конечно, эффективность различных приоритетов потоков зависит от способности ОС упреждать более крупные задачи.

4b9b3361

Ответ 1

Вы можете организовать только однократное выполнение фонового вычисления с помощью Future с помощью ComputedMap. Будущее представляет задачу, вычисляющую значение. Будущее создается ComputedMap и в то же время передается ExecutorService для выполнения фона. Исполнитель может быть настроен с помощью собственной ThreadFactory, которая создает потоки с низким приоритетом, например.

class LowPriorityThreadFactory implements ThreadFactory
{
   public Thread newThread(Runnable r) {
     Tread t = new Thread(r);
     t.setPriority(MIN_PRIORITY);
     return t;
   }
}

Когда это значение необходимо, ваш высокоприоритетный поток затем извлекает будущее из карты и вызывает метод get() для получения результата, ожидая его вычисления при необходимости. Чтобы избежать инверсия приоритета, вы добавляете в команду дополнительный код:

class HandlePriorityInversionTask extends FutureTask<ResultType>
{
   Integer priority;  // non null if set
   Integer originalPriority;
   Thread thread;
   public ResultType get() {
      if (!isDone()) 
         setPriority(Thread.currentThread().getPriority());
      return super.get();
   }
   public void run() {
      synchronized (this) {
         thread = Thread.currentThread();
         originalPriority = thread.getPriority();
         if (priority!=null) setPriority(priority);
      } 
      super.run();
   }
   protected synchronized void done() {
         if (originalPriority!=null) setPriority(originalPriority);
         thread = null;
   }

   void synchronized setPriority(int priority) {
       this.priority = Integer.valueOf(priority);
       if (thread!=null)
          thread.setPriority(priority);
   }
}

Это поможет повысить приоритет задачи до приоритета потока, вызывающего get(), если задача не завершена, и возвращает приоритет оригиналу, когда задача завершается, как обычно, или иначе. (Чтобы сохранить его кратким, код не проверяет, действительно ли приоритет больше, но его легко добавить.)

При вызове задачи с приоритетным вызовом get() будущее может еще не начаться. У вас может возникнуть соблазн избежать этого, установив большую верхнюю границу количества потоков, используемых службой-исполнителем, но это может быть плохой идеей, поскольку каждый поток может работать с высоким приоритетом, потребляя столько же процессора, сколько может раньше ОС отключает его. Вероятно, пул должен быть того же размера, что и количество аппаратных потоков, например. размер пула до Runtime.availableProcessors(). Если задача не запущена, а не ждать, пока исполнитель заплатит ее (что является формой инверсии приоритета, так как ваш поток с высоким приоритетом ожидает завершения потоков с низким приоритетом), вы можете отказаться от него текущий исполнитель и повторно отправить на исполнитель, работающий только с высокоприоритетными потоками.

Ответ 2

Одним из распространенных способов координации такого типа ситуации является наличие карты, значения которой являются объектами FutureTask. Итак, украв в качестве примера некоторый код, который я написал с моего веб-сервера, основная идея заключается в том, что для данного параметра мы видим, есть ли уже FutureTask (что означает, что расчет с этим параметром уже запланирован) и если так мы ждем его. В этом примере мы иначе планируем поиск, но это может быть сделано в другом месте с помощью отдельного вызова, если это было бы желательно:

  private final ConcurrentMap<WordLookupJob, Future<CharSequence>> cache = ...

  private Future<CharSequence> getOrScheduleLookup(final WordLookupJob word) {
    Future<CharSequence> f = cache.get(word);
    if (f == null) {
      Callable<CharSequence> ex = new Callable<CharSequence>() {
        public CharSequence call() throws Exception {
          return doCalculation(word);
        }
      };
      Future<CharSequence> ft = executor.submit(ex);
      f = cache.putIfAbsent(word, ft);
      if (f != null) {
        // somebody slipped in with the same word -- cancel the
        // lookup we've just started and return the previous one
        ft.cancel(true);
      } else {
        f = ft;
      }
    }
    return f;
  }

С точки зрения приоритетов потоков: интересно, достигнет ли это того, что вы думаете? Я не совсем понимаю вашу мысль о повышении приоритета поиска над ожидающим потоком: если поток ожидает, то он ждет, независимо от относительных приоритетов других потоков... (Возможно, вам стоит взглянуть на некоторые статьи, которые я написал на приоритеты потоков и планирование потоков, но, чтобы сократить длинную историю, я не уверен, что изменение приоритета обязательно купит вам то, что вы ожидаете.)

Ответ 3

Я подозреваю, что вы движетесь по неправильному пути, сосредоточившись на приоритетах потоков. Обычно данные, хранящиеся в кеше, являются дорогостоящими для вычисления из-за ввода-вывода (данные вне памяти) и ограничения по процессору (логическое вычисление). Если вы предпочтете угадать будущее пользователя, например, глядя на непрочитанные электронные письма, то это указывает на то, что ваша работа, скорее всего, связана с I/O. Это означает, что до тех пор, пока головоломка потока не возникает (какие планировщики запрещают), игра в игры с приоритетом потока не будет предлагать большую часть улучшения производительности.

Если стоимость представляет собой вызов ввода-вывода, фоновый поток блокируется в ожидании поступления данных и обработки этих данных достаточно дешево (например, десериализация). Поскольку изменение приоритета потока не будет предлагать большую часть ускорения, выполнение работы асинхронно на фоновом потоке должно быть достаточным. Если ограничение промаха в кеше слишком велико, то использование нескольких уровней кэширования имеет тенденцию способствовать дальнейшему снижению воспринимаемой пользователем задержки.

Ответ 4

В качестве альтернативы приоритетам потоков вы можете выполнять задачу с низким приоритетом, только если не выполняются высокоприоритетные задачи. Вот простой способ сделать это:

AtomicInteger highPriorityCount = new AtomicInteger();

void highPriorityTask() {
  highPriorityCount.incrementAndGet();
  try {
    highPriorityImpl();
  } finally {
    highPriorityCount.decrementAndGet();  
  }
}

void lowPriorityTask() {
  if (highPriorityCount.get() == 0) {
    lowPriorityImpl();
  }
}

В вашем случае использования оба метода Impl() будут вызывать get() на вычислительной карте, highPriorityImpl() в том же потоке и lowPriorityImpl() в другом потоке.

Вы можете написать более сложную версию, которая отсылает низкоприоритетные задачи до тех пор, пока не будут выполнены высокоприоритетные задачи и не будет ограничено количество одновременных низкоприоритетных задач.