Подтвердить что ты не робот

Реализация BlockingQueue: каковы различия между SynchronousQueue и LinkedBlockingQueue

Я вижу эту реализацию BlockingQueue и не могу понять различия между ними. Мой вывод:

  • Мне никогда не понадобится SynchronousQueue
  • LinkedBlockingQueue обеспечивает FIFO, BlockingQueue должен быть создан с параметром true, чтобы сделать его FIFO
  • SynchronousQueue ломает метод большинства коллекций (содержит, размер и т.д.)

Итак, когда мне когда-нибудь понадобится SynchronousQueue? Является ли производительность этой реализации лучше, чем LinkedBlockingQueue?

Чтобы сделать его более сложным... почему Executors.newCachedThreadPool использует SynchronousQueue, когда остальные (Executors.newSingleThreadExecutor и Executors.newFixedThreadPool) использовать LinkedBlockingQueue?

ИЗМЕНИТЬ

Первый вопрос решается. Но я до сих пор не понимаю, почему Executors.newCachedThreadPool использует SynchronousQueue, когда другие (Executors.newSingleThreadExecutor и Executors.newFixedThreadPool) используют LinkedBlockingQueue?

То, что я получаю, с SynchronousQueue, производитель будет заблокирован, если нет свободной нити. Но поскольку количество потоков практически неограничено (при необходимости будут созданы новые потоки), этого никогда не произойдет. Итак, почему он должен использовать SynchronousQueue?

4b9b3361

Ответ 1

SynchronousQueue - это особый вид очереди - он реализует подход рандеву (производитель ждет, пока потребитель не будет готов, потребитель ждет, пока производитель не будет готов) за интерфейсом Queue.

Поэтому вам может понадобиться это только в особых случаях, когда вам нужна эта конкретная семантика, например Одиночная нить задачи без очередей последующих запросов.

Другой причиной использования SynchronousQueue является производительность. Реализация SynchronousQueue, по-видимому, сильно оптимизирована, поэтому, если вам не нужно ничего больше, чем точка рандеву (как в случае Executors.newCachedThreadPool(), где потребители создаются "по требованию", t накапливается), вы можете получить прирост производительности с помощью SynchronousQueue.

Простой синтетический тест показывает, что простой простой производитель - один потребительский сценарий с пропускной способностью двухъядерных машин SynchronousQueue составляет ~ 20 раз выше пропускной способности LinkedBlockingQueue и ArrayBlockingQueue с длиной очереди = 1. Когда очередь длина увеличивается, их пропускная способность возрастает и почти достигает пропускной способности SynchronousQueue. Это означает, что SynchronousQueue имеет низкую нагрузку на синхронизацию на многоядерных машинах по сравнению с другими очередями. Но опять же, это имеет значение только в особых обстоятельствах, когда вам нужна точка рандеву, замаскированная под Queue.

EDIT:

Вот тест:

public class Test {
    static ExecutorService e = Executors.newFixedThreadPool(2);
    static int N = 1000000;

    public static void main(String[] args) throws Exception {    
        for (int i = 0; i < 10; i++) {
            int length = (i == 0) ? 1 : i * 5;
            System.out.print(length + "\t");
            System.out.print(doTest(new LinkedBlockingQueue<Integer>(length), N) + "\t");
            System.out.print(doTest(new ArrayBlockingQueue<Integer>(length), N) + "\t");
            System.out.print(doTest(new SynchronousQueue<Integer>(), N));
            System.out.println();
        }

        e.shutdown();
    }

    private static long doTest(final BlockingQueue<Integer> q, final int n) throws Exception {
        long t = System.nanoTime();

        e.submit(new Runnable() {
            public void run() {
                for (int i = 0; i < n; i++)
                    try { q.put(i); } catch (InterruptedException ex) {}
            }
        });    

        Long r = e.submit(new Callable<Long>() {
            public Long call() {
                long sum = 0;
                for (int i = 0; i < n; i++)
                    try { sum += q.take(); } catch (InterruptedException ex) {}
                return sum;
            }
        }).get();
        t = System.nanoTime() - t;

        return (long)(1000000000.0 * N / t); // Throughput, items/sec
    }
}    

И вот результат на моей машине:

enter image description here

Ответ 2

Пул потоков кэша создает потоки по требованию. Ему нужна очередь, которая либо передает задачу ожидающему потребителю, либо терпит неудачу. Если нет ожидающего потребителя, он создает новый поток. SynchronousQueue не содержит элемент, вместо этого он передает элемент или терпит неудачу.

Ответ 3

В настоящее время по умолчанию Executors (ThreadPoolExecutor) можно использовать набор заранее созданных потоков фиксированного размера и BlockingQueue определенного размера для любого переполнения или создавать потоки с максимальным размером, если (и только если), что очередь заполнена.

Это приводит к некоторым неожиданным свойствам. Например, поскольку дополнительные потоки создаются только после достижения емкости очереди, использование LinkedBlockingQueue (которое является неограниченным) означает, что новые потоки никогда не будут созданы, даже если текущий размер пула равен нулю. Если вы используете ArrayBlockingQueue, тогда новые потоки создаются только в том случае, если они заполнены, и существует разумная вероятность того, что последующие задания будут отклонены, если пул не очистил пространство к тому времени.

A SynchronousQueue имеет нулевую емкость, поэтому производитель блокируется до тех пор, пока не будет доступен потребитель, или создается поток. Это означает, что, несмотря на впечатляющие фигуры, созданные @axtavt, кэшированный пул потоков обычно имеет худшую производительность с точки зрения производителя.

К сожалению, в настоящее время нет хорошей версии библиотеки компромиссной реализации, которая будет создавать потоки во время всплесков или активности до некоторого максимума с минимального минимума. У вас либо есть растущий пул, либо фиксированный. У нас есть один внутренний, но он еще не готов к общественному потреблению.