Подтвердить что ты не робот

Блокировка очереди и многопоточный потребитель, как узнать, когда остановиться

У меня есть один производитель нитей, который создает некоторые объекты задачи, которые затем добавляются в ArrayBlockingQueue (который имеет фиксированный размер).

Я также запускаю многопоточный потребитель. Он создается как фиксированный пул потоков (Executors.newFixedThreadPool(threadCount);). Затем я отправляю некоторые запросы ConsumerWorker на этот threadPool, каждый из пользователей ConsumerWorker имеет отношение к вышеупомянутому экземпляру ArrayBlockingQueue.

Каждый такой Рабочий выполнит take() в очереди и обработает задачу.

Моя проблема в том, что лучший способ узнать Работника, когда больше не будет сделано. Другими словами, как я могу сообщить Рабочим, что продюсер закончил добавлять в очередь, и с этого момента каждый рабочий должен остановиться, когда увидит, что очередь пуста.

Теперь у меня есть настройка, в которой мой Producer инициализируется обратным вызовом, который запускается, когда он заканчивает работу (добавление материала в очередь). Я также сохраняю список всех созданных вами ConsumerWorkers и отправленных в ThreadPool. Когда Обратный звонок Продюсера говорит мне, что производитель сделан, я могу рассказать об этом каждому из рабочих. В этот момент они должны просто продолжать проверять, не является ли очередь пустой, и когда она становится пустой, они должны остановиться, что позволит мне изящно закрыть пул потоков ExecutorService. Это что-то вроде этого

public class ConsumerWorker implements Runnable{

private BlockingQueue<Produced> inputQueue;
private volatile boolean isRunning = true;

public ConsumerWorker(BlockingQueue<Produced> inputQueue) {
    this.inputQueue = inputQueue;
}

@Override
public void run() {
    //worker loop keeps taking en element from the queue as long as the producer is still running or as 
    //long as the queue is not empty:
    while(isRunning || !inputQueue.isEmpty()) {
        System.out.println("Consumer "+Thread.currentThread().getName()+" START");
        try {
            Object queueElement = inputQueue.take();
            //process queueElement
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

//this is used to signal from the main thread that he producer has finished adding stuff to the queue
public void setRunning(boolean isRunning) {
    this.isRunning = isRunning;
}

}

Проблема здесь в том, что у меня есть очевидное состояние гонки, когда иногда продюсер заканчивает, сигнализирует об этом, а ConsumerWorkers останавливается, прежде чем потреблять все в очереди.

Мой вопрос - лучший способ синхронизировать это, чтобы все работало нормально? Должен ли я синхронизировать всю часть, где он проверяет, работает ли производитель, плюс, если очередь пуста, плюс взять что-то из очереди в одном блоке (на объекте очереди)? Должен ли я просто синхронизировать обновление isRunning boolean на экземпляре ConsumerWorker? Любое другое предложение?

ОБНОВЛЕНИЕ, ЗДЕСЬ РАБОЧАЯ РЕАЛИЗАЦИЯ, ЧТО Я ЗАВЕРШЕНО ИСПОЛЬЗОВАТЬ:

public class ConsumerWorker implements Runnable{

private BlockingQueue<Produced> inputQueue;

private final static Produced POISON = new Produced(-1); 

public ConsumerWorker(BlockingQueue<Produced> inputQueue) {
    this.inputQueue = inputQueue;
}

@Override
public void run() {
    //worker loop keeps taking en element from the queue as long as the producer is still running or as 
    //long as the queue is not empty:
    while(true) {
        System.out.println("Consumer "+Thread.currentThread().getName()+" START");
        try {
            Produced queueElement = inputQueue.take();
            Thread.sleep(new Random().nextInt(100));
            if(queueElement==POISON) {
                break;
            }
            //process queueElement
        } catch (Exception e) {
            e.printStackTrace();
        }
        System.out.println("Consumer "+Thread.currentThread().getName()+" END");
    }
}

//this is used to signal from the main thread that he producer has finished adding stuff to the queue
public void stopRunning() {
    try {
        inputQueue.put(POISON);
    } catch (InterruptedException e) {
        // TODO Auto-generated catch block
        e.printStackTrace();
    }
}

}

Это было сильно вдохновлено ответом JohnVint ниже, с некоторыми незначительными изменениями.

=== Обновление из-за комментария @vendhan.

Спасибо за ваше ожидание. Вы правы, первый фрагмент кода в этом вопросе имеет (среди прочих вопросов) тот, где while(isRunning || !inputQueue.isEmpty()) действительно не имеет смысла.

В моей фактической окончательной реализации этого я делаю то, что ближе к вашему предложению заменить "||" (или) с "& &" (и) в том смысле, что каждый рабочий (потребитель) теперь только проверяет, является ли элемент, который он получил из списка, ядовитой таблеткой, и если так прекращается (теоретически мы можем сказать, что рабочий должен работать И очередь должна не пусто).

4b9b3361

Ответ 1

Вы должны продолжить take() из очереди. Вы можете использовать ядовитую таблетку, чтобы заставить работника остановиться. Например:

private final Object POISON_PILL = new Object();

@Override
public void run() {
    //worker loop keeps taking en element from the queue as long as the producer is still running or as 
    //long as the queue is not empty:
    while(isRunning) {
        System.out.println("Consumer "+Thread.currentThread().getName()+" START");
        try {
            Object queueElement = inputQueue.take();
            if(queueElement == POISON_PILL) {
                 inputQueue.add(POISON_PILL);//notify other threads to stop
                 return;
            }
            //process queueElement
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

//this is used to signal from the main thread that he producer has finished adding stuff to the queue
public void finish() {
    //you can also clear here if you wanted
    isRunning = false;
    inputQueue.add(POISON_PILL);
}

Ответ 2

Я отправил работникам специальный рабочий пакет, чтобы сообщить, что они должны отключиться:

public class ConsumerWorker implements Runnable{

private static final Produced DONE = new Produced();

private BlockingQueue<Produced> inputQueue;

public ConsumerWorker(BlockingQueue<Produced> inputQueue) {
    this.inputQueue = inputQueue;
}

@Override
public void run() {
    for (;;) {
        try {
            Produced item = inputQueue.take();
            if (item == DONE) {
                inputQueue.add(item); // keep in the queue so all workers stop
                break;
            }
            // process `item`
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

}

Чтобы остановить рабочих, просто добавьте ConsumerWorker.DONE в очередь.

Ответ 3

В вашем кодовом блоке, где вы пытаетесь извлечь элемент из очереди, используйте poll(time,unit) вместо take().

try { 
    Object queueElement = inputQueue.poll(timeout,unit);
     //process queueElement        
 } catch (InterruptedException e) {
        if(!isRunning && queue.isEmpty())
         return ; 
 } 

Указывая соответствующие значения таймаута, вы гарантируете, что потоки не будут блокировать, если есть неудачная последовательность

  • isRunning истинно
  • Очередь становится пустой, поэтому потоки вводят заблокированное ожидание (при использовании take()
  • isRunning установлено значение false

Ответ 4

Существует несколько стратегий, которые можно использовать, но один простой - иметь подкласс задачи, который сигнализирует о завершении задания. Производитель не посылает этот сигнал напрямую. Вместо этого он создает экземпляр этого подкласса задачи. Когда один из ваших потребителей снимает эту задачу и выполняет ее, это вызывает отправку сигнала.

Ответ 5

Мне пришлось использовать многопоточный производитель и многопоточный потребитель. Я закончил с схемой Scheduler -- N Producers -- M Consumers, каждая из двух обменивается данными через очередь (всего две очереди). Планировщик заполняет первую очередь запросами на получение данных, а затем заполняет их "ядовитыми таблетками". Существует счетчик активных производителей (атомный int), а последний производитель, который получает последнюю ядовитую таблетку, отправляет M ядовитые таблетки в очередь потребителей.

Ответ 6

Не можем ли мы сделать это с помощью CountDownLatch, где размер - это количество записей в продюсере. И каждый потребитель будет countDown после обработки записи. И он пересекает метод awaits(), когда все задачи завершены. Затем прекратите все потребители ур. Поскольку все записи обрабатываются.