Подтвердить что ты не робот

ExecutorService удивительная точка безубыточности производительности - эмпирические правила?

Я пытаюсь выяснить, как правильно использовать Java-исполнителей. Я понимаю, что выполнение задач для ExecutorService имеет свои собственные накладные расходы. Тем не менее, я удивлен, увидев, что это так высоко, как есть.

Моя программа должна обрабатывать огромное количество данных (данные на фондовом рынке) с максимально возможной задержкой. Большинство вычислений - довольно простые арифметические операции.

Я попытался проверить что-то очень простое: "Math.random() * Math.random()"

Простейший тест запускает это вычисление в простом цикле. Второй тест выполняет одно и то же вычисление внутри анонимного Runnable (это должно измерять стоимость создания новых объектов). Третий тест проходит Runnable до ExecutorService (это измеряет стоимость введения исполнителей).

Я провел тесты на моем ноутбуке dinky (2 процессора, 1,5 гигабайта):

(in milliseconds)
simpleCompuation:47
computationWithObjCreation:62
computationWithObjCreationAndExecutors:422

(примерно один раз из четырех прогонов, первые два числа в итоге равны)

Обратите внимание, что исполнители занимают гораздо больше времени, чем выполнение в одном потоке. Номера были примерно одинаковыми для размеров пула потоков от 1 до 8.

Вопрос: Я пропустил что-то очевидное или ожидаются эти результаты? Эти результаты говорят мне, что любая задача, которую я передаю исполнителю, должна выполнять некоторые нетривиальные вычисления. Если я обрабатываю миллионы сообщений, и мне нужно выполнить очень простые (и дешевые) преобразования для каждого сообщения, я все равно не смогу использовать исполнителей... попытка распространения вычислений на нескольких процессорах может оказаться дороже, чем просто делая их в одном потоке. Конструктивное решение становится намного сложнее, чем я думал изначально. Любые мысли?


import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

public class ExecServicePerformance {

 private static int count = 100000;

 public static void main(String[] args) throws InterruptedException {

  //warmup
  simpleCompuation();
  computationWithObjCreation();
  computationWithObjCreationAndExecutors();

  long start = System.currentTimeMillis();
  simpleCompuation();
  long stop = System.currentTimeMillis();
  System.out.println("simpleCompuation:"+(stop-start));

  start = System.currentTimeMillis();
  computationWithObjCreation();
  stop = System.currentTimeMillis();
  System.out.println("computationWithObjCreation:"+(stop-start));

  start = System.currentTimeMillis();
  computationWithObjCreationAndExecutors();
  stop = System.currentTimeMillis();
  System.out.println("computationWithObjCreationAndExecutors:"+(stop-start));


 }

 private static void computationWithObjCreation() {
  for(int i=0;i<count;i++){
   new Runnable(){

    @Override
    public void run() {
     double x = Math.random()*Math.random();
    }

   }.run();
  }

 }

 private static void simpleCompuation() {
  for(int i=0;i<count;i++){
   double x = Math.random()*Math.random();
  }

 }

 private static void computationWithObjCreationAndExecutors()
   throws InterruptedException {

  ExecutorService es = Executors.newFixedThreadPool(1);
  for(int i=0;i<count;i++){
   es.submit(new Runnable() {
    @Override
    public void run() {
     double x = Math.random()*Math.random();     
    }
   });
  }
  es.shutdown();
  es.awaitTermination(10, TimeUnit.SECONDS);
 }
}
4b9b3361

Ответ 1

  • Использование исполнителей - это использование процессоров и/или процессорных ядер, поэтому, если вы создаете пул потоков, который в лучшем случае использует количество процессоров, у вас должно быть столько потоков, сколько процессоров/ядер.
  • Вы правы, слишком большие затраты на создание новых объектов. Таким образом, одним из способов сокращения расходов является использование партий. Если вы знаете объем и количество вычислений, вы создаете партии. Поэтому подумайте о тысячах вычислений, выполненных в одной выполненной задаче. Вы создаете партии для каждого потока. Как только вычисление будет выполнено (java.util.concurrent.Future), вы создадите следующую партию. Даже создание новых партий может быть выполнено в parralel (4 процессора → 3 потока для вычисления, 1 поток для пакетного обеспечения). В итоге вы можете увеличить пропускную способность, но с более высокими требованиями к памяти (партии, подготовка).

Изменить: я изменил ваш пример, и я позволил ему запустить мой маленький двухъядерный ноутбук x200.

provisioned 2 batches to be executed
simpleCompuation:14
computationWithObjCreation:17
computationWithObjCreationAndExecutors:9

Как вы видите в исходном коде, я также вывел из цикла измерение выделения ресурсов и исполнителей. Это более справедливо по сравнению с двумя другими методами.

Посмотрите результаты самостоятельно...

import java.util.List;
import java.util.Vector;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

public class ExecServicePerformance {

    private static int count = 100000;

    public static void main( String[] args ) throws InterruptedException {

        final int cpus = Runtime.getRuntime().availableProcessors();

        final ExecutorService es = Executors.newFixedThreadPool( cpus );

        final Vector< Batch > batches = new Vector< Batch >( cpus );

        final int batchComputations = count / cpus;

        for ( int i = 0; i < cpus; i++ ) {
            batches.add( new Batch( batchComputations ) );
        }

        System.out.println( "provisioned " + cpus + " batches to be executed" );

        // warmup
        simpleCompuation();
        computationWithObjCreation();
        computationWithObjCreationAndExecutors( es, batches );

        long start = System.currentTimeMillis();
        simpleCompuation();
        long stop = System.currentTimeMillis();
        System.out.println( "simpleCompuation:" + ( stop - start ) );

        start = System.currentTimeMillis();
        computationWithObjCreation();
        stop = System.currentTimeMillis();
        System.out.println( "computationWithObjCreation:" + ( stop - start ) );

        // Executor

        start = System.currentTimeMillis();
        computationWithObjCreationAndExecutors( es, batches );    
        es.shutdown();
        es.awaitTermination( 10, TimeUnit.SECONDS );
        // Note: Executor#shutdown() and Executor#awaitTermination() requires
        // some extra time. But the result should still be clear.
        stop = System.currentTimeMillis();
        System.out.println( "computationWithObjCreationAndExecutors:"
                + ( stop - start ) );
    }

    private static void computationWithObjCreation() {

        for ( int i = 0; i < count; i++ ) {
            new Runnable() {

                @Override
                public void run() {

                    double x = Math.random() * Math.random();
                }

            }.run();
        }

    }

    private static void simpleCompuation() {

        for ( int i = 0; i < count; i++ ) {
            double x = Math.random() * Math.random();
        }

    }

    private static void computationWithObjCreationAndExecutors(
            ExecutorService es, List< Batch > batches )
            throws InterruptedException {

        for ( Batch batch : batches ) {
            es.submit( batch );
        }

    }

    private static class Batch implements Runnable {

        private final int computations;

        public Batch( final int computations ) {

            this.computations = computations;
        }

        @Override
        public void run() {

            int countdown = computations;
            while ( countdown-- > -1 ) {
                double x = Math.random() * Math.random();
            }
        }
    }
}

Ответ 2

Это не справедливый тест для пула потоков по следующим причинам,

  • Вы не используете преимущества объединения, потому что у вас есть только один поток.
  • Работа слишком проста, что накладные расходы пула не могут быть оправданы. Умножение на CPU с FPP занимает всего несколько циклов.

Учитывая следующие дополнительные шаги, пул потоков должен делать помимо создания объекта и выполнения задания,

  • Поместите задание в очередь
  • Удалить задание из очереди
  • Получить поток из пула и выполнить задание
  • Вернуть поток в пул

Когда у вас есть реальная работа и несколько потоков, преимущество пула потоков будет очевидным.

Ответ 3

Я не думаю, что это абсолютно реалистично, поскольку вы создаете новую службу-исполнителя каждый раз, когда вы вызываете вызов метода. Если у вас нет очень странных требований, которые кажутся нереальными - как правило, вы создадите службу, когда ваше приложение запустится, а затем отправьте ему задания.

Если вы повторите тест, но инициализируйте службу как поле, один раз, вне цикла синхронизации; то вы увидите фактические накладные расходы на отправку Runnables в службу и их запуск самостоятельно.

Но я не думаю, что вы полностью поняли суть вопроса: Исполнители не предназначены для повышения эффективности, они там, чтобы упростить координацию и передачу работы в пул потоков. Они всегда будут менее эффективны, чем просто вызывать Runnable.run() самостоятельно (так как в конце дня служба-исполнитель все равно должна это сделать, после предварительной предварительной уборки). Это когда вы используете их из нескольких потоков, требующих асинхронной обработки, что они действительно сияют.

Также считайте, что вы смотрите на относительную разницу во времени в основном фиксированной стоимости (накладные расходы Executor одинаковы, если ваши задачи занимают 1 мс или 1 час для запуска) по сравнению с очень небольшой переменной величиной (ваш тривиальный запуск). Если служба-исполнитель занимает 5 мс для выполнения задачи 1 мс, это не очень выгодная цифра. Если для выполнения 5-секундной задачи (например, нетривиального SQL-запроса) требуется 5 мс, то это совершенно ничтожно и полностью стоит.

Таким образом, в какой-то степени это зависит от вашей ситуации - если у вас есть чрезвычайно критически важный раздел, выполняющий множество небольших задач, которые не нужно выполнять параллельно или асинхронно, то вы ничего не получите от Исполнителя, Если вы обрабатываете более сложные задачи параллельно и хотите реагировать асинхронно (например, webapp), то Executors великолепны.

Являются ли они лучшим выбором для вас, зависит от вашей ситуации, но на самом деле вам нужно попробовать тесты с реалистичными репрезентативными данными. Я не думаю, что было бы целесообразно делать какие-либо выводы из проведенных вами тестов, если ваши задачи на самом деле тривиальны (и вы не хотите повторно использовать экземпляр-исполнитель...).

Ответ 4

Math.random() фактически синхронизируется с одним генератором случайных чисел. Вызов Math.random() приводит к существенному утверждению для генератора чисел. На самом деле, чем больше потоков у вас есть, тем медленнее будет.

Из Math.random() javadoc:

Этот метод правильно синхронизирован, чтобы обеспечить правильное использование более чем один поток. Однако, если многие потоки должны генерировать псевдослучайные чисел с большой скоростью, это может уменьшить конкуренцию для каждого потока имеют свой собственный генератор псевдослучайных чисел.

Ответ 5

"Накладные расходы", о которых вы говорите, не имеют ничего общего с ExecutorService, это вызвано синхронизацией нескольких потоков на Math.random, создавая конфликт блокировок.

Итак, да, вы что-то упустили (и "правильный" ответ ниже на самом деле не правильный).

Вот пример кода Java 8 для демонстрации 8 потоков, выполняющих простую функцию, в которой нет конфликта блокировок:

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.function.DoubleFunction;

import com.google.common.base.Stopwatch;

public class ExecServicePerformance {

    private static final int repetitions = 120;
    private static int totalOperations = 250000;
    private static final int cpus = 8;
    private static final List<Batch> batches = batches(cpus);

    private static DoubleFunction<Double> performanceFunc = (double i) -> {return Math.sin(i * 100000 / Math.PI); };

    public static void main( String[] args ) throws InterruptedException {

        printExecutionTime("Synchronous", ExecServicePerformance::synchronous);
        printExecutionTime("Synchronous batches", ExecServicePerformance::synchronousBatches);
        printExecutionTime("Thread per batch", ExecServicePerformance::asynchronousBatches);
        printExecutionTime("Executor pool", ExecServicePerformance::executorPool);

    }

    private static void printExecutionTime(String msg, Runnable f) throws InterruptedException {
        long time = 0;
        for (int i = 0; i < repetitions; i++) {
            Stopwatch stopwatch = Stopwatch.createStarted();
            f.run(); //remember, this is a single-threaded synchronous execution since there is no explicit new thread
            time += stopwatch.elapsed(TimeUnit.MILLISECONDS);
        }
        System.out.println(msg + " exec time: " + time);
    }    

    private static void synchronous() {
        for ( int i = 0; i < totalOperations; i++ ) {
            performanceFunc.apply(i);
        }
    }

    private static void synchronousBatches() {      
        for ( Batch batch : batches) {
            batch.synchronously();
        }
    }

    private static void asynchronousBatches() {

        CountDownLatch cb = new CountDownLatch(cpus);

        for ( Batch batch : batches) {
            Runnable r = () ->  { batch.synchronously(); cb.countDown(); };
            Thread t = new Thread(r);
            t.start();
        }

        try {
            cb.await();
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }        
    }

    private static void executorPool() {

        final ExecutorService es = Executors.newFixedThreadPool(cpus);

        for ( Batch batch : batches ) {
            Runnable r = () ->  { batch.synchronously(); };
            es.submit(r);
        }

        es.shutdown();

        try {
            es.awaitTermination( 10, TimeUnit.SECONDS );
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        } 

    }

    private static List<Batch> batches(final int cpus) {
        List<Batch> list = new ArrayList<Batch>();
        for ( int i = 0; i < cpus; i++ ) {
            list.add( new Batch( totalOperations / cpus ) );
        }
        System.out.println("Batches: " + list.size());
        return list;
    }

    private static class Batch {

        private final int operationsInBatch;

        public Batch( final int ops ) {
            this.operationsInBatch = ops;
        }

        public void synchronously() {
            for ( int i = 0; i < operationsInBatch; i++ ) {
                performanceFunc.apply(i);
            }
        }
    }


}

Тайм-ауты результатов для 120 тестов 25 000 операций (мс):

  • Синхронное время выполнения: 9956
  • Синхронные партии время исполнения: 9900
  • Тема за одно время исполнения: 2176
  • Время выполнения пула исполнителей: 1922

Победитель: Исполнитель.

Ответ 6

Вот результаты на моей машине (OpenJDK 8 на 64-битном Ubuntu 14.0, Thinkpad W530)

simpleCompuation:6
computationWithObjCreation:5
computationWithObjCreationAndExecutors:33

Конечно, накладные расходы. Но помните, каковы эти цифры: миллисекунды для итераций 100 тыс. В вашем случае накладные расходы составляли около 4 микросекунд на итерацию. Для меня накладные расходы составляли около четверти микросекунды.

Накладные расходы - это синхронизация, внутренние структуры данных и, возможно, отсутствие оптимизации JIT из-за сложных кодовых путей (конечно, более сложных, чем ваш цикл for).

Задачи, которые вы на самом деле хотите распараллелить, будут стоить этого, несмотря на четверть микросекундных издержек.


FYI, это было бы очень плохим вычислением для распараллеливания. Я поднял нить до 8 (количество ядер):

simpleCompuation:5
computationWithObjCreation:6
computationWithObjCreationAndExecutors:38

Это не ускорилось. Это связано с тем, что Math.random() синхронизируется.

Ответ 7

Во-первых, есть несколько проблем с микрообъектом. Вы делаете разминки, что хорошо. Тем не менее, лучше запустить тест несколько раз, что должно дать представление о том, действительно ли оно прогрелось и дисперсия результатов. Также, как правило, лучше выполнять проверку каждого алгоритма в отдельных прогонах, иначе вы можете вызвать деоптимизацию при изменении алгоритма.

Задача очень маленькая, хотя я не совсем уверен, насколько малы. Таким образом, количество раз быстрее довольно бессмысленно. В многопоточных ситуациях он будет касаться одних и тех же изменчивых местоположений, так что потоки могут вызвать очень плохую производительность (используйте экземпляр Random для потока). Кроме того, пробег в 47 миллисекунд немного короткий.

Конечно, переход в другой поток для крошечной операции не будет быстрым. Разделите задачи на большие размеры, если это возможно. JDK7 выглядит так, как будто он будет иметь инфраструктуру fork-join, которая пытается поддерживать прекрасные задачи из алгоритмов разделения и покорения, предпочитая выполнять задачи в одном потоке по порядку, при этом большие задачи вытягиваются простыми потоками.

Ответ 8

Фиксированная версия ThreadPool предназначена для повторного использования уже созданных потоков. Таким образом, прирост производительности проявляется в отсутствии необходимости воссоздавать новый поток при каждом представлении задачи. Следовательно, время остановки должно быть взято внутри поставленной задачи. Как раз в последнем утверждении метода run.

Ответ 9

Вам нужно как-то выполнить групповое исполнение, чтобы представить большие части вычислений для каждого потока (например, группы сборки на основе символа запаса). Я получил лучшие результаты в подобных сценариях, используя Disruptor. У него очень низкие накладные расходы. Тем не менее его важная задача для группировки, наивный круговой ронин обычно создает много промахов в кэше.

см. http://java-is-the-new-c.blogspot.de/2014/01/comparision-of-different-concurrency.html