Подтвердить что ты не робот

Задержка Java BlockingQueue на Linux

Я использую BlockingQueue: s (пытается как ArrayBlockingQueue, так и LinkedBlockingQueue) передавать объекты между разными потоками в приложении Im, которое работает в настоящее время. Производительность и латентность относительно важны в этом приложении, поэтому мне было любопытно, сколько времени требуется для передачи объектов между двумя потоками с помощью BlockingQueue. Чтобы измерить это, я написал простую программу с двумя потоками (один потребитель и один производитель), где я разрешаю производителю передавать временную метку (полученную с использованием System.nanoTime()) для потребителя, см. Код ниже.

Я помню, как читал где-то на каком-то форуме, что потребовалось около 10 микросекунд для кого-то, кто это пробовал (не знаю, на какой ОС и аппаратном обеспечении было), поэтому я не был слишком удивлен, когда мне понадобилось ~ 30 микросекунд мой ящик Windows 7 (процессор Intel E7500 Core 2 Duo, 2,93 ГГц), в то время как в фоновом режиме работает множество других приложений. Тем не менее, я был очень удивлен, когда я сделал тот же тест на нашем гораздо более быстром сервере Linux (два четырехъядерных процессора Intel X5677 3.46GHz, работающих под управлением Debian 5 с ядром 2.6.26-2-amd64). Я ожидал, что латентность будет ниже, чем у моего окна, но, наоборот, она была намного выше - ~ 75 - 100 микросекунд! Оба теста были выполнены с помощью Suns Hotspot JVM версии 1.6.0-23.

Кто-нибудь еще пробовал подобные тесты с аналогичными результатами в Linux? Или кто-нибудь знает, почему он намного медленнее в Linux (с лучшим оборудованием), может быть, что переключение потоков просто намного медленнее в Linux по сравнению с Windows? Если это так, то похоже, что окна на самом деле намного лучше подходят для некоторых приложений. Любая помощь, помогающая мне понять относительно высокие показатели, очень ценится.

Изменить
После комментария от DaveC я также проверил, где я ограничил JVM (на машине Linux) одним ядром (т.е. Все потоки, запущенные на одном ядре). Это резко изменило результаты - латентность снизилась до менее 20 микросекунд, то есть лучше, чем результаты на машине Windows. Я также провел несколько тестов, в которых я ограничил поток производителей одним ядром и потребительским потоком на другой (пытаясь как иметь их в одном и том же сокете и в разных сокетах), но это, похоже, не помогло - латентность все еще была ~ 75 микросекунд. Btw, это тестовое приложение - это почти все, что я запускаю на машине во время теста на выполнение.

Кто-нибудь знает, имеют ли эти результаты смысл? Должно ли быть действительно намного медленнее, если производитель и потребитель работают на разных ядрах? Любой вход действительно оценен.

Отредактировано снова (6 января):
Я экспериментировал с различными изменениями в коде и рабочей среде:

  • Я обновил ядро ​​Linux до 2.6.36.2 (от 2.6.26.2). После обновления ядра измеренное время изменилось на 60 микросекунд с очень небольшими вариациями, начиная с 75-100 до обновления. Настройка близости процессора к потоку производителя и потребителя не имела никакого эффекта, за исключением случаев, когда они ограничивали их одним ядром. При работе на одном и том же ядре измеряемая латентность составляла 13 микросекунд.

  • В исходном коде я попросил продюсера спать в течение 1 секунды между каждой итерацией, чтобы дать потребителю достаточно времени, чтобы вычислить прошедшее время и распечатать его на консоли. Если я удалю вызов Thread.sleep() и вместо этого позволю как барьер производителя, так и потребительский вызов .await() на каждой итерации (потребитель называет его после печати прошедшего времени на консоль), измеренная задержка уменьшается с 60 микросекунд до менее 10 микросекунд. При запуске потоков на одном и том же ядре латентность становится ниже 1 микросекунды. Может ли кто-нибудь объяснить, почему это значительно сократило латентность? Мое первое предположение заключалось в том, что изменение привело к тому, что продюсер назвал queue.put() перед тем, как потребитель назвал queue.take(), поэтому потребителю никогда не приходилось блокировать, но после игры с модифицированной версией ArrayBlockingQueue я обнаружил это предположение было ложным - потребитель действительно блокировал. Если у вас есть другие предположения, пожалуйста, дайте мне знать. (Кстати, если я позволю продюсеру назвать как Thread.sleep(), так и барьер .await(), латентность остается на 60 микросекунд).

  • Я также пробовал другой подход - вместо вызова queue.take() я вызывал queue.poll() с тайм-аутом в 100 микронов. Это уменьшило среднюю задержку до менее 10 микросекунд, но, конечно, намного интенсивнее процессора (но, вероятно, менее интенсивный процессор, ожидающий ожидание?).

Отредактировано снова (10 января) - Проблема решена:
ninjalj предположил, что латентность ~ 60 микросекунд была вызвана тем, что ЦП должен был проснуться от более глубоких состояний сна - и он был совершенно прав! После отключения C-состояний в BIOS латентность была уменьшена до < 10 микросекунд. Это объясняет, почему я получил гораздо лучшую задержку в пункте 2 выше - когда я отправлял объекты чаще, процессор был достаточно занят, чтобы не переходить в более глубокие состояния сна. Большое спасибо всем, кто нашел время, чтобы прочитать мой вопрос и поделился своими мыслями здесь!

...

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CyclicBarrier;

public class QueueTest {

    ArrayBlockingQueue<Long> queue = new ArrayBlockingQueue<Long>(10);
    Thread consumerThread;
    CyclicBarrier barrier = new CyclicBarrier(2);
    static final int RUNS = 500000;
    volatile int sleep = 1000;

    public void start() {
        consumerThread = new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    barrier.await();
                    for(int i = 0; i < RUNS; i++) {
                        consume();

                    }
                } catch (Exception e) {
                    e.printStackTrace();
                } 
            }
        });
        consumerThread.start();

        try {
            barrier.await();
        } catch (Exception e) { e.printStackTrace(); }

        for(int i = 0; i < RUNS; i++) {
            try {
                if(sleep > 0)
                    Thread.sleep(sleep);
                produce();

            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

    public void produce() {
        try {
            queue.put(System.nanoTime());
        } catch (InterruptedException e) {
        }
    }

    public void consume() {
        try {
            long t = queue.take();
            long now = System.nanoTime();
            long time = (now - t) / 1000; // Divide by 1000 to get result in microseconds
            if(sleep > 0) {
                System.out.println("Time: " + time);
            }

        } catch (Exception e) {
            e.printStackTrace();
        }

    }

    public static void main(String[] args) {
        QueueTest test = new QueueTest();
        System.out.println("Starting...");
        // Run first once, ignoring results
        test.sleep = 0;
        test.start();
        // Run again, printing the results
        System.out.println("Starting again...");
        test.sleep = 1000;
        test.start();
    }
}
4b9b3361

Ответ 1

Ваш тест не является хорошим показателем задержки передачи в очереди, потому что у вас есть один поток, читающий очередь, которая синхронно пишет с System.out (делая String и длинную конкатенацию, пока она находится на ней), прежде чем она снова понадобится. Чтобы правильно измерить это, вам нужно переместить эту активность из этой нити и сделать как можно меньше работы в принимающей нитке.

Вам лучше было бы сделать расчет (тогда-сейчас) в берущем и добавить результат в какую-то другую коллекцию, которая периодически сливается другим потоком, который выводит результаты. Я имею тенденцию делать это, добавляя к соответствующей структуре, поддерживаемой соответствующим массивом, доступ к которой осуществляется через AtomicReference (поэтому поток отчетности просто должен getAndSet в этой ссылке с другим экземпляром этой структуры хранения, чтобы захватить последнюю партию результатов, например, сделать 2 списки, устанавливают один как активный, каждый поток xsa просыпается и меняет активный и пассивный). Затем вы можете сообщать о некоторых дистрибутивах вместо каждого отдельного результата (например, диапазон дециля), что означает, что вы не генерируете огромные файлы журналов при каждом запуске и получаете полезную информацию, напечатанную для вас.

FWIW Я согласен со временами, когда Питер Лори заявил, и если латентность действительно важна, тогда вам нужно подумать о оживленном ожидании с соответствующей привязкой к процессору (т.е. выделить ядро ​​для этого потока)

ИЗМЕНИТЬ после 6 января

Если я удалю вызов Thread.sleep() и вместо этого позволю как барьер производителя, так и потребительский вызов.аваит() на каждой итерации (потребитель называет его после печати прошедшего времени на консоль), измеренная задержка уменьшается с 60 микросекунд до менее 10 микросекунд. При запуске потоков на одном и том же ядре латентность становится ниже 1 микросекунды. Может ли кто-нибудь объяснить, почему это значительно сократило время ожидания?

Вы смотрите на разницу между java.util.concurrent.locks.LockSupport#park (и соответствующими unpark) и Thread#sleep. Большинство j.u.c. материал построен на LockSupport (часто через AbstractQueuedSynchronizer, который ReentrantLock предоставляет или напрямую), и этот (в Hotspot) разрешается до sun.misc.Unsafe#parkunpark), и это, как правило, оказывается в руках pthread (posix threads) lib. Обычно pthread_cond_broadcast для пробуждения и pthread_cond_wait или pthread_cond_timedwait для таких вещей, как BlockingQueue#take.

Я не могу сказать, что я когда-либо смотрел, как фактически реализуется Thread#sleep (потому что я никогда не сталкивался с чем-то низкой латентностью, которая не является условием ожидания), но я бы предположил, что это заставляет ее быть пониженными по графику более агрессивным способом, чем механизм сигнализации pthread, и именно это объясняет разницу в задержках.

Ответ 2

Я бы использовал только ArrayBlockingQueue, если можно. Когда я использовал его, время ожидания составляло от 8 до 18 микросекунд в Linux. Некоторые замечания.

  • Стоимость - это в основном время, необходимое для пробуждения темы. Когда вы пробуждаете поток, его данные/код не будут находиться в кеше, поэтому вы обнаружите, что если вы заметите, что произойдет после того, как поток проснулся, что может занять 2-5 раз больше, чем если бы вы повторяли одно и то же много раз.
  • В некоторых операциях используются вызовы ОС (например, блокировка/циклические барьеры), они часто более дороги в сценарии с низкой задержкой, чем ожидание. Я предлагаю попробовать заняться ждать вашего продюсера, а не использовать CyclicBarrier. Вы могли бы оживить вашего потребителя, но это может быть неоправданно дорогостоящим в реальной системе.

Ответ 3

@Peter Lawrey

В некоторых операциях используются вызовы ОС (например, блокирующие/циклические барьеры)

Это НЕ ОС (ядро). Реализовано с помощью простого CAS (который на x86 также выходит за пределы свободной памяти)

Еще одно: не используйте ArrayBlockingQueue, если вы не знаете, почему (вы его используете).

@OP: Посмотрите на ThreadPoolExecutor, он предлагает отличную структуру для производителей/потребителей.

Изменить ниже:

чтобы уменьшить задержку (запретить ожидание ожидания), измените очередь на SynchronousQueue, добавьте следующее, как перед запуском пользователя

...
consumerThread.setPriority(Thread.MAX_PRIORITY);
consumerThread.start();

Это лучшее, что вы можете получить.


Edit2: Здесь w/sync. очередь. И не распечатывать результаты.

package t1;

import java.math.BigDecimal;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.SynchronousQueue;

public class QueueTest {

    static final int RUNS = 250000;

    final SynchronousQueue<Long> queue = new SynchronousQueue<Long>();

    int sleep = 1000;

    long[] results  = new long[0];
    public void start(final int runs) throws Exception {
        results = new long[runs];
        final CountDownLatch barrier = new CountDownLatch(1);
        Thread consumerThread = new Thread(new Runnable() {
            @Override
            public void run() {
                barrier.countDown();
                try {

                    for(int i = 0; i < runs; i++) {                        
                        results[i] = consume(); 

                    }
                } catch (Exception e) {
                    return;
                } 
            }
        });
        consumerThread.setPriority(Thread.MAX_PRIORITY);
        consumerThread.start();


        barrier.await();
        final long sleep = this.sleep;
        for(int i = 0; i < runs; i++) {
            try {                
                doProduce(sleep);

            } catch (Exception e) {
                return;
            }
        }
    }

    private void doProduce(final long sleep) throws InterruptedException {
        produce();
    }

    public void produce() throws InterruptedException {
        queue.put(new Long(System.nanoTime()));//new Long() is faster than value of
    }

    public long consume() throws InterruptedException {
        long t = queue.take();
        long now = System.nanoTime();
        return now-t;
    }

    public static void main(String[] args) throws Throwable {           
        QueueTest test = new QueueTest();
        System.out.println("Starting + warming up...");
        // Run first once, ignoring results
        test.sleep = 0;
        test.start(15000);//10k is the normal warm-up for -server hotspot
        // Run again, printing the results
        System.gc();
        System.out.println("Starting again...");
        test.sleep = 1000;//ignored now
        Thread.yield();
        test.start(RUNS);
        long sum = 0;
        for (long elapsed: test.results){
            sum+=elapsed;
        }
        BigDecimal elapsed = BigDecimal.valueOf(sum, 3).divide(BigDecimal.valueOf(test.results.length), BigDecimal.ROUND_HALF_UP);        
        System.out.printf("Avg: %1.3f micros%n", elapsed); 
    }
}

Ответ 4

Если задержка является критичной и вам не нужна строгая семантика FIFO, тогда вы можете рассмотреть JSR-166 LinkedTransferQueue. Он позволяет исключить, чтобы противоположные операции могли обменивать значения вместо синхронизации в структуре данных очереди. Такой подход помогает сократить конкуренцию, обеспечивает параллельный обмен и позволяет избежать штрафов за сон/пробуждение нити.