Подтвердить что ты не робот

"Закрытие" блокирующей очереди

Im, используя java.util.concurrent.BlockingQueue в очень простом сценарии производителя и потребителя. Например. этот псевдо-код изображает потребительскую часть:

class QueueConsumer implements Runnable {

    @Override
    public void run() {
        while(true)
        {
            try {
                ComplexObject complexObject = myBlockingQueue.take();
                //do something with the complex object
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
    }
}

Пока все хорошо. В javadoc блокирующей очереди я читал:

A BlockingQueue не является внутренне поддерживать любые "близкие" или "выключение", чтобы указать, что больше предметов не будет добавлено. Нужды и использование таких функций, как правило, зависит от реализации. Например, общая тактика заключается в том, что производители вставить специальный конец потока или яд объекты, которые интерпретируются соответственно, когда их принимают потребители.

К сожалению, из-за использования дженериков и характера ComplexObject его нетривиально, чтобы вставить "ядовитый объект" в очередь. Так что эта "общая тактика" в моем сценарии не очень удобна.

Мой вопрос: какие еще хорошие тактики/шаблоны я могу использовать для "закрытия" очереди?

Спасибо!

4b9b3361

Ответ 1

Если у вас есть дескриптор потребительского потока, вы можете его прервать. С кодом, который вы дали, это убьет потребителя. Я бы не ожидал, что производитель получит это; ему, вероятно, придется обратный вызов программному контроллеру, чтобы сообщить ему об этом. Затем контроллер будет прерывать потребительский поток.

Вы всегда можете закончить работу перед тем, как послушать прерывание. Например:

class QueueConsumer implements Runnable {
    @Override
    public void run() {
        while(!(Thread.currentThread().isInterrupted())) {
            try {
                final ComplexObject complexObject = myBlockingQueue.take();
                this.process(complexObject);

            } catch (InterruptedException e) {
                // Set interrupted flag.
                Thread.currentThread().interrupt();
            }
        }

        // Thread is getting ready to die, but first,
        // drain remaining elements on the queue and process them.
        final LinkedList<ComplexObject> remainingObjects;
        myBlockingQueue.drainTo(remainingObjects);
        for(ComplexObject complexObject : remainingObjects) {
            this.process(complexObject);
        }
    }

    private void process(final ComplexObject complexObject) {
        // Do something with the complex object.
    }
}

Я бы предпочел, чтобы так или иначе отравить очередь. Если вы хотите убить поток, попросите нить убить себя.

(Хорошо видеть, что кто-то правильно управляет InterruptedException.)


Кажется, что есть некоторые сомнения относительно обработки прерываний здесь. Во-первых, я хотел бы, чтобы все прочитали эту статью: http://www.ibm.com/developerworks/java/library/j-jtp05236.html

Теперь, понимая, что никто на самом деле не читал это, вот сделка. Поток получает только InterruptedException, если он в настоящий момент блокируется во время прерывания. В этом случае Thread.interrupted() вернет false. Если он не блокирует, он не получит это исключение, а вместо Thread.interrupted() вернется true. Таким образом, ваш защитник цикла должен абсолютно, независимо от того, проверьте Thread.interrupted() или иным образом не потеряйте прерывание потока.

Итак, поскольку вы проверяете Thread.interrupted() независимо от того, что вы, и вы вынуждены ловить InterruptedException (и должны иметь дело с ним, даже если вы не были вынуждены), теперь у вас есть две области кода, которые обрабатывают одно и то же событие, прерывание потока. Один из способов справиться с этим - нормализовать их в одно условие, то есть либо проверка булевого состояния может вызывать исключение, либо исключение может устанавливать логическое состояние. Я выбираю позже.


Изменить: Обратите внимание, что статический метод прерывания Thread # очищает прерванный статус текущего потока.

Ответ 2

Альтернативой было бы обернуть обработку, которую вы делаете, с ExecutorService, и пусть сам ExecutorService определяет, добавляются ли задания в очередь.

В принципе, вы используете ExecutorService.shutdown(), который при вызове запрещает выполнение каких-либо задач исполнителем.

Я не уверен, как вы в настоящее время отправляете задачи в QueueConsumer в своем примере. Я предположил, что у вас есть какой-то метод submit(), и в этом примере использовался аналогичный метод.

import java.util.concurrent.*;

class QueueConsumer {
    private final ExecutorService executor = Executors.newSingleThreadExecutor();

    public void shutdown() {
        executor.shutdown(); // gracefully shuts down the executor
    }

    // 'Result' is a class you'll have to write yourself, if you want.
    // If you don't need to provide a result, you can just Runnable
    // instead of Callable.
    public Future<Result> submit(final ComplexObject complexObject) {
        if(executor.isShutdown()) {
            // handle submitted tasks after the executor has been told to shutdown
        }

        return executor.submit(new Callable<Result>() {
            @Override
            public Result call() {
                return process(complexObject);
            }
        });
    }

    private Result process(final ComplexObject complexObject) {
        // Do something with the complex object.
    }
}

Этот пример представляет собой только ту же иллюстрацию, что предлагает пакет java.util.concurrent; вероятно, есть некоторые оптимизации, которые могут быть сделаны для него (например, QueueConsumer, поскольку его собственный класс, вероятно, даже не нужен, вы можете просто предоставить ExecutorService всем производителям, отправляющим задания).

Выполните пакет java.util.concurrent (начиная с некоторых ссылок выше). Вы можете обнаружить, что это дает вам множество отличных вариантов того, что вы пытаетесь сделать, и вам даже не нужно беспокоиться о регулировании рабочей очереди.

Ответ 3

Другая идея сделать это просто:

class ComplexObject implements QueueableComplexObject
{
    /* the meat of your complex object is here as before, just need to
     * add the following line and the "implements" clause above
     */
    @Override public ComplexObject asComplexObject() { return this; }
}

enum NullComplexObject implements QueueableComplexObject
{
    INSTANCE;

    @Override public ComplexObject asComplexObject() { return null; }
}

interface QueueableComplexObject
{
    public ComplexObject asComplexObject();
}

Затем используйте BlockingQueue<QueueableComplexObject> в качестве очереди. Если вы хотите завершить обработку очереди, сделайте queue.offer(NullComplexObject.INSTANCE). Со стороны потребителя сделайте

boolean ok = true;
while (ok)
{
    ComplexObject obj = queue.take().asComplexObject();
    if (obj == null)
        ok = false;
    else
        process(obj);
}

/* interrupt handling elided: implement this as you see fit,
 * depending on whether you watch to swallow interrupts or propagate them
 * as in your original post
 */

Нет instanceof, и вам не нужно создавать поддельные ComplexObject, которые могут быть дорогими/сложными в зависимости от его реализации.

Ответ 4

Другая возможность создания ядовитого объекта: сделайте его конкретным экземпляром класса. Таким образом, вам не нужно обманывать подтипы или испортить общий характер.

Недостаток: это не сработает, если между производителем и потребителем есть какой-то барьер сериализации.

public class ComplexObject
{
    public static final POISON_INSTANCE = new ComplexObject();

    public ComplexObject(whatever arguments) {
    }

    // Empty constructor for creating poison instance.
    private ComplexObject() {
    }
}

class QueueConsumer implements Runnable {
    @Override
    public void run() {
        while(!(Thread.currentThread().interrupted())) {
            try {
                final ComplexObject complexObject = myBlockingQueue.take();
                if (complexObject == ComplexObject.POISON_INSTANCE)
                    return;

                // Process complex object.

            } catch (InterruptedException e) {
                // Set interrupted flag.
                Thread.currentThread().interrupt();
            }
        }
    }
}

Ответ 5

Возможно ли расширить объект ComplexObject и выкрикнуть нетривиальную функциональность создания? По сути, у вас заканчивается объект оболочки, но вы можете сделать, затем instance of, чтобы увидеть, является ли конец объекта очереди.

Ответ 6

Вы можете обернуть ваш общий объект в объект данных. В этом объекте данных вы можете добавить дополнительные данные, такие как статус отравляющего объекта. Dataobject - это класс с двумя полями. T complexObject; и boolean poison;.

Ваш потребитель берет объекты данных из очереди. Если возвращен ядовитый объект, вы закрываете его, иначе вы разворачиваете общий и вызываете "процесс (complexObject)".

Я использую java.util.concurrent.LinkedBlockingDeque<E>, чтобы вы могли добавить объект в конце очереди и перенести его с фронта. Таким образом, ваш объект будет обработан по порядку, но более важно безопасно закрыть очередь после запуска в ядовитый объект.

Чтобы поддерживать несколько потребителей, я добавляю объект яда обратно в очередь, когда я нахожу его.

public final class Data<T> {
    private boolean poison = false;
    private T complexObject;

    public Data() {
        this.poison = true;
    }

    public Data(T complexObject) {
        this.complexObject = complexObject;
    }

    public boolean isPoison() {
        return poison;
    }

    public T getComplexObject() {
        return complexObject;
    }
}
public class Consumer <T> implements Runnable {

    @Override
    public final void run() {
        Data<T> data;
        try {
            while (!(data = queue.takeFirst()).isPoison()) {
                process(data.getComplexObject());
            }
        } catch (final InterruptedException e) {
            Thread.currentThread().interrupt();
            return;
        }
        // add the poison object back so other consumers can stop too.
        queue.addLast(line);
    }
}

Ответ 7

В этой ситуации вам, как правило, приходится отказываться от дженериков и делать объект хранения в очереди. то вам просто нужно проверить свой "ядовитый объект" перед тем, как придать его фактическому типу.

Ответ 8

Мне кажется разумным реализовать тесный BlockingQueue:

import java.util.concurrent.BlockingQueue;

public interface CloseableBlockingQueue<E> extends BlockingQueue<E> {
    /** Returns <tt>true</tt> if this queue is closed, <tt>false</tt> otherwise. */
    public boolean isClosed();

    /** Closes this queue; elements cannot be added to a closed queue. **/
    public void close();
}

Было бы довольно просто реализовать это со следующими формами поведения (см. сводная таблица методов):

  • Вставить

    • Выдает исключение, Специальное значение:

      Ведет себя как полная Queue, ответственность вызывающего абонента для тестирования isClosed().

    • Блокировки:

      Выбрасывает IllegalStateException, если и когда закрыт.

    • Тайм-аут:

      Возвращает false, если и когда закрыто, ответственность вызывающего абонента для проверки isClosed().

  • Удалить

    • Выдает исключение, Специальное значение:

      Ведет себя как пустая Queue, ответственность вызывающего абонента для тестирования isClosed().

    • Блокировки:

      Выбрасывает NoSuchElementException, если и при закрытии.

    • Тайм-аут:

      Возвращает null, если и когда закрыто, ответственность вызывающего абонента для проверки isClosed().

  • Исследовать

    Без изменений.

Я сделал это, отредактировав источник, найдя его на github.com.

Ответ 9

Я использовал эту систему:

ConsumerClass
private boolean queueIsNotEmpty = true;//with setter
...
do {
    ...
    sharedQueue.drainTo(docs);
    ...
} while (queueIsNotEmpty || sharedQueue.isEmpty());

Когда заканчивается производитель, я установил в поле consumerObject, queueIsNotEmpty значение false

Ответ 10

Сегодня я решил эту проблему, используя объект-оболочку. Поскольку ComplexObject слишком сложный для подкласса, я завернул объект ComplexObject в объект ComplexObjectWrapper. Затем используется ComplexObjectWrapper как общий тип.

public class ComplexObjectWrapper {
ComplexObject obj;
}

public class EndOfQueue extends ComplexObjectWrapper{}

Теперь вместо BlockingQueue<ComplexObject> я сделал   BlockingQueue<ComplexObjectWrapper>

Поскольку у меня был контроль над Потребителем и Продюсером, это решение сработало для меня.