Подтвердить что ты не робот

Java BlockingQueue не имеет блокировки?

У меня есть блокирующая очередь объектов.

Я хочу написать поток, который блокируется до тех пор, пока в очереди не будет объекта. Аналогично функциональности, предоставляемой BlockingQueue.take().

Однако, поскольку я не знаю, смогу ли я успешно обработать объект, я хочу просто заглянуть() и не удалить объект. Я хочу удалить объект, только если я успешно обработаю его.

Итак, мне нужна функция блокировки peek(). В настоящее время peek() просто возвращает, если очередь пуста в соответствии с javadocs.

Я что-то упустил? Есть ли другой способ достичь этой функциональности?

EDIT:

Любые мысли, если я просто использовал поточную безопасную очередь и заглянул и спал вместо этого?

public void run() {
    while (!__exit) {
        while (__queue.size() != 0) {
            Object o =  __queue.peek();
            if (o != null) {
                if (consume(o) == true) {
                    __queue.remove();
                } else {
                    Thread.sleep(10000); //need to backoff (60s) and try again
                }
            }
        }
        Thread.sleep(1000); //wait 1s for object on queue
    }
}

Обратите внимание, что у меня есть только один поток потребителей и один (отдельный) поток производителей. Я думаю, это не так эффективно, как использование BlockingQueue... Любые комментарии оценены.

4b9b3361

Ответ 1

Вы можете использовать LinkedBlockingDeque и физически удалить элемент из очереди (используя takeLast()), но снова заменить его на конец очереди, если обработка завершилась неудачей с помощью putLast(E e). Между тем ваши "продюсеры" добавят элементы в фронт очереди с помощью putFirst(E e).

Вы всегда можете инкапсулировать это поведение в свою собственную реализацию Queue и предоставить метод blockingPeek(), который выполняет takeLast(), за которым следует putLast() за кулисами на базовом LinkedBlockingDeque. Следовательно, с точки зрения вызывающего клиента элемент никогда не удаляется из вашей очереди.

Ответ 2

Однако, поскольку я не знаю, смогу ли я успешно обработать объект, я хочу просто заглянуть() и не удалить объект. Я хочу удалить объект, только если я успешно обработаю его.

В общем, он не является потокобезопасным. Что делать, если после peek() и определить, что объект может быть успешно обработан, но перед тем, как take() его удалить и обработать, другой поток принимает этот объект?

Ответ 3

Единственное, что я знаю об этом, это BlockingBuffer в Коллекции сообщества Apache:

Если вызов get или remove вызывается пустой буфер, вызывающий поток ожидает уведомления о том, что добавление или Операция addAll завершена.

get() эквивалентен peek(), а a Buffer можно сделать как BlockingQueue, украсив UnboundedFifoBuffer с a BlockingBuffer

Ответ 4

Не могли бы вы просто добавить очередь прослушивателя событий в свою очередь блокировки, а затем, когда что-то добавлено в (блокирующую) очередь, отправьте событие своим слушателям? Вы могли бы заблокировать поток, пока не будет вызван метод actionPerformed.

Ответ 5

Быстрый ответ заключается в том, что на самом деле нет способа заблокировать просмотр, бар, реализующий блокирующую очередь с блокировкой peek() самостоятельно.

Я что-то пропустил?

peek() может быть неприятным с concurrency -

  • Если вы не можете обработать сообщение peek() 'd - оно будет оставлено в очереди, если у вас несколько пользователей.
  • Кто собирается получить этот объект из очереди, если вы не можете его обработать?
  • Если у вас несколько потребителей, вы получаете условие гонки между вами peek() 'ing и другим потоком также обрабатываете элементы, что приводит к дублированию обработки или хуже.

Похоже, вам может быть лучше удалить элемент и обработать его, используя Шаблон цепочки ответственности

Изменить: re: ваш последний пример: если у вас есть только один потребитель, вы никогда не избавитесь от объекта в очереди - если он не обновится в среднем времени - в этом случае вам лучше быть очень осторожным безопасность потока и, вероятно, не должны помещать элемент в очередь в любом случае.

Ответ 6

Похоже, что у BlockingQueue нет функции, которую вы указываете.

Возможно, я попытаюсь немного заново создать проблему: что бы вы делали с объектами, которые вы не можете "правильно обработать"? Если вы просто оставите их в очереди, вам придется в какой-то момент вытащить их и разобраться с ними. Я бы рекомендовал либо выяснить, как их обрабатывать (обычно, если queue.get() дает какое-либо недопустимое или плохое значение, вы, вероятно, можете просто отбросить его на пол) или выбрать другую структуру данных, чем FIFO.