У меня есть несколько BlockingQueues, содержащих отправленные сообщения. Возможно ли иметь меньше потребителей, чем очередей? Я не хочу перебирать очереди и продолжать опрос (ожидание), и я не хочу нить для каждой очереди. Вместо этого я хотел бы иметь один поток, который пробуждается, когда сообщение доступно в любой из очередей.
Несколько очередей блокировки, один потребитель
Ответ 1
Один трюк, который вы можете сделать, - это очередь очередей. Итак, что бы вы сделали, у вас есть одна блокирующая очередь, на которую подписаны все потоки. Затем, когда вы вставляете что-то в один из ваших BlockingQueues, вы также ставите очередь блокировки в этой отдельной очереди. Таким образом, у вас будет что-то вроде:
BlockingQueue<WorkItem> producers[] = new BlockingQueue<WorkItem>[NUM_PRODUCERS];
BlockingQueue<BlockingQueue<WorkItem>> producerProducer = new BlockingQueue<BlockingQueue<WorkItem>>();
Затем, когда вы получите новый рабочий элемент:
void addWorkItem(int queueIndex, WorkItem workItem) {
assert queueIndex >= 0 && queueIndex < NUM_PRODUCERS : "Pick a valid number";
//Note: You may want to make the two operations a single atomic operation
producers[queueIndex].add(workItem);
producerProducer.add(producers[queueIndex]);
}
Теперь ваши потребители могут заблокировать производителя. Я не уверен, насколько ценна эта стратегия, но она выполняет то, что вы хотите.
Ответ 2
LinkedBlockingMultiQueue делает то, что вы просите. Это не позволяет потребителю блокировать произвольные BlockingQueues, но можно создавать "подпоследовательности" из одной "многопоследовательности" и достигать того же эффекта. Производители предлагают в под-очередях, и потребители могут блокировать себя, опрашивая единственную множественную очередь, ожидая любого элемента.
Он также поддерживает приоритеты, то есть отбирает элементы из одних очередей, а затем рассматривает другие.
Пример:
LinkedBlockingMultiQueue<Int, String> q = new LinkedBlockingMultiQueue<>();
q.addSubQueue(1 /* key */, 10 /* priority */);
q.addSubQueue(2 /* key */, 10 /* priority */);
LinkedBlockingMultiQueue<Int, String>.SubQueue sq1 = q.getSubQueue(1);
LinkedBlockingMultiQueue<Int, String>.SubQueue sq2 = q.getSubQueue(2);
Тогда вы можете предложить и опросить:
sq1.offer("x1");
q.poll(); // "x1"
sq2.offer("x2");
q.poll(); // "x2"
Отказ от ответственности: я являюсь автором библиотеки.
Ответ 3
Полиморфизм
Эту проблему можно решить с помощью полиморфизма. Представьте, что у вас есть две работы с именами JobOne
и JobTwo
, и вы собираетесь их использовать потребителю. Вы должны определить интерфейс под названием jobs, например:
interface Jobs {
...
}
А затем вы можете реализовать свои задания с помощью этого интерфейса, например:
class JobOne implements Job {
...
}
class JobTwo implements Job {
...
}
Теперь вы можете использовать Jobs
для своей очереди блокировки, чтобы добавить два вида работ для потребления.
BlockingQueue<Jobs> Jobs queue = new BlockingQueue<Jobs>();