Подтвердить что ты не робот

Несколько очередей блокировки, один потребитель

У меня есть несколько BlockingQueues, содержащих отправленные сообщения. Возможно ли иметь меньше потребителей, чем очередей? Я не хочу перебирать очереди и продолжать опрос (ожидание), и я не хочу нить для каждой очереди. Вместо этого я хотел бы иметь один поток, который пробуждается, когда сообщение доступно в любой из очередей.

4b9b3361

Ответ 1

Один трюк, который вы можете сделать, - это очередь очередей. Итак, что бы вы сделали, у вас есть одна блокирующая очередь, на которую подписаны все потоки. Затем, когда вы вставляете что-то в один из ваших BlockingQueues, вы также ставите очередь блокировки в этой отдельной очереди. Таким образом, у вас будет что-то вроде:

BlockingQueue<WorkItem> producers[] = new BlockingQueue<WorkItem>[NUM_PRODUCERS];
BlockingQueue<BlockingQueue<WorkItem>> producerProducer = new BlockingQueue<BlockingQueue<WorkItem>>();

Затем, когда вы получите новый рабочий элемент:

void addWorkItem(int queueIndex, WorkItem workItem) {
    assert queueIndex >= 0 && queueIndex < NUM_PRODUCERS : "Pick a valid number";
    //Note: You may want to make the two operations a single atomic operation
    producers[queueIndex].add(workItem);
    producerProducer.add(producers[queueIndex]);
}

Теперь ваши потребители могут заблокировать производителя. Я не уверен, насколько ценна эта стратегия, но она выполняет то, что вы хотите.

Ответ 2

LinkedBlockingMultiQueue делает то, что вы просите. Это не позволяет потребителю блокировать произвольные BlockingQueues, но можно создавать "подпоследовательности" из одной "многопоследовательности" и достигать того же эффекта. Производители предлагают в под-очередях, и потребители могут блокировать себя, опрашивая единственную множественную очередь, ожидая любого элемента.

Он также поддерживает приоритеты, то есть отбирает элементы из одних очередей, а затем рассматривает другие.

Пример:

LinkedBlockingMultiQueue<Int, String> q = new LinkedBlockingMultiQueue<>();
q.addSubQueue(1 /* key */, 10 /* priority */);
q.addSubQueue(2 /* key */, 10 /* priority */);
LinkedBlockingMultiQueue<Int, String>.SubQueue sq1 = q.getSubQueue(1);
LinkedBlockingMultiQueue<Int, String>.SubQueue sq2 = q.getSubQueue(2);

Тогда вы можете предложить и опросить:

sq1.offer("x1");
q.poll(); // "x1"
sq2.offer("x2");
q.poll(); // "x2"

Отказ от ответственности: я являюсь автором библиотеки.

Ответ 3

Полиморфизм

Эту проблему можно решить с помощью полиморфизма. Представьте, что у вас есть две работы с именами JobOne и JobTwo, и вы собираетесь их использовать потребителю. Вы должны определить интерфейс под названием jobs, например:

interface Jobs {
    ...
}

А затем вы можете реализовать свои задания с помощью этого интерфейса, например:

class JobOne implements Job {
    ...
}

class JobTwo implements Job {
    ...
}

Теперь вы можете использовать Jobs для своей очереди блокировки, чтобы добавить два вида работ для потребления.

BlockingQueue<Jobs> Jobs queue = new BlockingQueue<Jobs>();