Как прервать BlockingQueue, который блокирует take()?

У меня есть класс, который принимает объекты из BlockingQueue и обрабатывает их, вызывая take() в непрерывном цикле. В какой-то момент я знаю, что в очередь не будет добавлено больше объектов. Как прервать метод take(), чтобы он блокировал блокировку?

Здесь класс, который обрабатывает объекты:

public class MyObjHandler implements Runnable {

  private final BlockingQueue<MyObj> queue;

  public class MyObjHandler(BlockingQueue queue) {
    this.queue = queue;

  public void run() {
    try {
      while (true) {
        MyObj obj = queue.take();
        // process obj here
        // ...
    } catch (InterruptedException e) {

И здесь метод, который использует этот класс для обработки объектов:

public void testHandler() {

  BlockingQueue<MyObj> queue = new ArrayBlockingQueue<MyObj>(100);  

  MyObjectHandler  handler = new MyObjectHandler(queue);
  new Thread(handler).start();

  // get objects for handler to process
  for (Iterator<MyObj> i = getMyObjIterator(); i.hasNext(); ) {

  // what code should go here to tell the handler
  // to stop waiting for more objects?

Ответ 1

Если прерывание потока не является вариантом, другое - разместить объект "маркер" или "команда" в очереди, который будет распознан как таковой MyObjHandler и вырваться из цикла.

Ответ 2

BlockingQueue<MyObj> queue = new ArrayBlockingQueue<MyObj>(100);
MyObjectHandler handler = new MyObjectHandler(queue);
Thread thread = new Thread(handler);
for (Iterator<MyObj> i = getMyObjIterator(); i.hasNext(); ) {

Однако, если вы это сделаете, поток может быть прерван, пока в очереди все еще есть элементы, ожидающие обработки. Возможно, вы захотите использовать poll вместо take, что позволит потоку обработки отключиться и завершить работу, когда он ждал в то время как без нового ввода.

Ответ 3

Очень поздно, но надеюсь, что это тоже помогает другим. Я столкнулся с подобной проблемой и использовал подход poll, предложенный erickson выше, некоторые незначительные изменения,

class MyObjHandler implements Runnable 
    private final BlockingQueue<MyObj> queue;
    public volatile boolean Finished;  //VOLATILE GUARANTEES UPDATED VALUE VISIBLE TO ALL
    public MyObjHandler(BlockingQueue queue) 
        this.queue = queue;
        Finished = false;
    public void run() 
        while (true) 
                MyObj obj = queue.poll(100, TimeUnit.MILLISECONDS);
                if(obj!= null)//Checking if job is to be processed then processing it first and then checking for return
                    // process obj here
                    // ...
                if(Finished && queue.isEmpty())

            catch (InterruptedException e) 

public void testHandler() 
    BlockingQueue<MyObj> queue = new ArrayBlockingQueue<MyObj>(100); 

    MyObjHandler  handler = new MyObjHandler(queue);
    new Thread(handler).start();

    // get objects for handler to process
    for (Iterator<MyObj> i = getMyObjIterator(); i.hasNext(); )

    // what code should go here to tell the handler to stop waiting for more objects?
    handler.Finished = true; //THIS TELLS HIM
    //If you need you can wait for the termination otherwise remove join

Это позволило решить обе проблемы

  • Отметить BlockingQueue так, чтобы он знал, что ему больше не нужно ждать элементов
  • Не прерывался между ними, так что блоки обработки заканчиваются только тогда, когда обрабатываются все элементы в очереди, и нет элементов, которые еще не добавлены.

Ответ 4

Прервать поток:


Ответ 5

Или не прерывайте, его противно.

    public class MyQueue<T> extends ArrayBlockingQueue<T> {

        private static final long serialVersionUID = 1L;
        private boolean done = false;

        public ParserQueue(int capacity) {  super(capacity); }

        public void done() { done = true; }

        public boolean isDone() { return done; }

         * May return null if producer ends the production after consumer 
         * has entered the element-await state.
        public T take() throws InterruptedException {
            T el;
            while ((el = super.poll()) == null && !done) {
                synchronized (this) {

            return el;
  • Когда производитель помещает объект в очередь, вызовите queue.notify(), если он закончится, вызовите queue.done()
  • loop while (! queue.isDone() ||! queue.isEmpty())
  • test take() возвращает значение для null