Подтвердить что ты не робот

Является ли BlockingQueue полностью потокобезопасным в Java

Я знаю, что в документации сказано, что объект является потокобезопасным, но означает ли это, что весь доступ к нему из всех методов является потокобезопасным? Так что, если я вызову put() для него из нескольких потоков одновременно и take() для него в один и тот же экземпляр, ничего плохого не произойдет?

Я спрашиваю, потому что этот ответ заставляет меня задуматься: qaru.site/info/126073/...

4b9b3361

Ответ 1

Быстрый ответ: да, они потокобезопасны. Но не оставляйте его там...

Во-первых, небольшой домик, BlockingQueue - это интерфейс, и любая реализация, которая не является потокобезопасной, будет нарушать документированный контракт. Ссылка, которую вы включили, ссылалась на LinkedBlockingQueue, которая имеет некоторую уловку.

Ссылка которую вы включили, делает интересное наблюдение, да есть две блокировки внутри LinkedBlockingQueue. Тем не менее, он не понимает, что краевой случай, когда "простая" реализация напала на фол, на самом деле обрабатывается, поэтому методы take and put более сложны, чем можно было бы ожидать вначале.

LinkedBlockingQueue оптимизирован, чтобы избежать использования одной и той же блокировки как при чтении, так и в записи, это уменьшает конкуренцию, однако для правильного поведения он полагается на то, что очередь не пуста. Когда в очереди есть элементы внутри, то нажатие и поп-точки не находятся в одной области памяти, и соперничество можно избежать. Однако, когда очередь пуста, конкуренция не может быть устранена, поэтому необходим дополнительный код для обработки этого общего "края". Это общий компромисс между сложностью кода и производительностью/масштабируемостью.

Затем возникает вопрос, как LinkedBlockingQueue знает, когда очередь пуста/не пуста и, таким образом, обрабатывает потоки? Ответ заключается в том, что он использует AtomicInteger и Condition как две дополнительные параллельные структуры данных. Элемент AtomicInteger используется для проверки того, равна ли длина очереди нулевым, и Условие используется для ожидания сигнала для уведомления ожидающего потока, когда очередь, вероятно, находится в желаемом состоянии. Эта дополнительная координация имеет накладные расходы, однако в измерениях было показано, что при увеличении количества одновременных потоков, что накладные расходы этого метода ниже, чем конкуренция, которая вводится с помощью одной блокировки.

Ниже я скопировал код из LinkedBlockingQueue и добавил комментарии, объясняющие, как они работают. На высоком уровне take() сначала блокирует все остальные вызовы take(), а затем сигналы put() по мере необходимости. put() работает аналогичным образом, сначала он блокирует все остальные вызовы put(), а затем сигналы take() при необходимости.

Из метода put():

    // putLock coordinates the calls to put() only; further coordination
    // between put() and take() follows below
    putLock.lockInterruptibly();
    try {
        // block while the queue is full; count is shared between put() and take()
        // and is safely visible between cores but prone to change between calls
        // a while loop is used because state can change between signals, which is
        // why signals get rechecked and resent.. read on to see more of that 
        while (count.get() == capacity) { 
                notFull.await();
        }

        // we know that the queue is not full so add
        enqueue(e);
        c = count.getAndIncrement();

        // if the queue is not full, send a signal to wake up 
        // any thread that is possibly waiting for the queue to be a little
        // emptier -- note that this is logically part of 'take()' but it
        // has to be here because take() blocks itself
        if (c + 1 < capacity)
            notFull.signal();
    } finally {
        putLock.unlock();
    }
    if (c == 0)
        signalNotEmpty();

От take()

    takeLock.lockInterruptibly();
    try {
            // wait for the queue to stop being empty
            while (count.get() == 0) {
                notEmpty.await();
            }

        // remove element
        x = dequeue();

        // decrement shared count
        c = count.getAndDecrement();

        // send signal that the queue is not empty
        // note that this is logically part of put(), but
        // for thread coordination reasons is here
        if (c > 1)
            notEmpty.signal();
    } finally {
        takeLock.unlock();
    }
    if (c == capacity)
        signalNotFull();

Ответ 2

Да, все реализации BlockingQueue являются поточно-ориентированными для сдачи, взятия и всех действий.

Ссылка идет только наполовину... и не охватывает все детали. Это потокобезопасно.

Ответ 3

Этот ответ немного странный - для начала BlockingQueue - это интерфейс, поэтому он не имеет блокировок. Реализации, такие как ArrayBlockingQueue, используют один и тот же замок для add() и take(), поэтому все будет в порядке. Как правило, если какая-либо реализация не является потокобезопасной, то она является ошибкой реализации.

Ответ 4

Я думаю, что @Chris K упустил некоторые моменты. "Если в очереди есть элементы, то точки push и pop находятся не в одной и той же области памяти, и можно избежать конфликтов". Заметьте, что когда в очереди один элемент, head.next и tail указывают на одно и то же. node и put() и take() могут как получить блокировки, так и выполнить.

Я думаю, что пустое и полное условие может быть решено синхронизированными put() и take(). Однако, когда дело доходит до одного элемента, очередь lb имеет пустой узел-заглушку, который может иметь отношение к безопасности потока.

Ответ 5

LinkedBlockingQueue находится в пакете java.util.concurrent;

Ответ 6

Я пробовал эту реализацию на Leetcode  import java.util.concurrent.BlockingQueue;   import java.util.concurrent.LinkedBlockingDeque;

class FooBar {

    private final BlockingQueue<Object> line =  new LinkedBlockingDeque<>(1);
    private static final Object PRESENT = new Object();
    private int n;

    public FooBar(int n) {
        this.n = n;
    }

    public void foo(Runnable printFoo) throws InterruptedException {

        for (int i = 0; i < n; i++) {
            line.put(PRESENT);
            // printFoo.run() outputs "foo". Do not change or remove this line.
            printFoo.run();
        }
    }

    public void bar(Runnable printBar) throws InterruptedException {

        for (int i = 0; i < n; i++) {
            line.take();
            // printBar.run() outputs "bar". Do not change or remove this line.
            printBar.run();
        }
    }
}

При n = 3 чаще всего я получаю правильный ответ foobarfoobarfoorbar, но иногда я получаю barbarfoofoofoobar, что довольно удивительно. Я решил использовать с помощью ReentrantLock и Condition, @chris-k вы можете пролить больше света