Какой лучший способ подождать (без вращения), пока что-то не будет доступно ни в одной из двух (многопроцессорных) Queues, где оба находятся на одна и та же система?
"выбрать" в нескольких очередях многопроцессорности Python?
Ответ 1
Не похоже, что есть официальный способ справиться с этим. Или, по крайней мере, не на основе этого:
Вы можете попробовать что-то вроде того, что делает этот пост - доступ к файлам с файлами базового канала:
а затем используйте select.
Ответ 2
На самом деле вы можете использовать объекты multiprocessing.Queue в select.select. т.е.
que = multiprocessing.Queue()
(input,[],[]) = select.select([que._reader],[],[])
будет выбирать que только в том случае, если он готов к чтению.
Никакой документации об этом, хотя. Я читал исходный код библиотеки multiprocessing.queue(в linux обычно он похож на /usr/lib/python 2.6/multiprocessing/queue.py), чтобы узнать это.
С Queue.Queue Я не нашел никакого умного способа сделать это (и мне очень понравилось).
Ответ 3
Похоже, что использование потоков, которые пересылают входящие элементы в одну очередь, которую вы затем ждете, является практическим выбором при использовании многопроцессорности независимо от платформы.
Избегая потоков, требуется либо обработка низкоуровневых протоколов /FD, которые являются как специфичными для платформы, так и непростыми для обработки последовательно с API более высокого уровня.
Или вам понадобятся очереди с возможностью устанавливать обратные вызовы, которые, я думаю, являются надлежащим интерфейсом более высокого уровня. То есть вы должны написать что-то вроде:
singlequeue = Queue() incoming_queue1.setcallback(singlequeue.put) incoming_queue2.setcallback(singlequeue.put) ... singlequeue.get()
Возможно, многопроцессорный пакет может вырастить этот API, но он еще не существует. Концепция хорошо работает с py.execnet, которая использует термин "канал" вместо "очередей", см. Здесь http://tinyurl.com/nmtr4w
Ответ 4
Вы можете использовать что-то вроде шаблона Observer, при этом абоненты очереди уведомляются об изменениях состояния.
В этом случае у вас может быть ваш рабочий поток, назначенный в качестве слушателя в каждой очереди, и всякий раз, когда он получает сигнал готовности, он может работать над новым элементом, в противном случае спя.
Ответ 5
Не уверен, насколько хорошо выбор в многопроцессорной очереди работает на окнах. Поскольку выбор в Windows прослушивает сокеты, а не файлы, я подозреваю, что могут быть проблемы.
Мой ответ - сделать поток, чтобы прослушивать каждую очередь в блокирующем режиме и помещать результаты в одну очередь, прослушанную основным потоком, по существу мультиплексируя отдельные очереди в один.
Мой код для этого:
"""
Allow multiple queues to be waited upon.
queue,value = multiq.select(list_of_queues)
"""
import queue
import threading
class queue_reader(threading.Thread):
def __init__(self,inq,sharedq):
threading.Thread.__init__(self)
self.inq = inq
self.sharedq = sharedq
def run(self):
while True:
data = self.inq.get()
print ("thread reads data=",data)
result = (self.inq,data)
self.sharedq.put(result)
class multi_queue(queue.Queue):
def __init__(self,list_of_queues):
queue.Queue.__init__(self)
for q in list_of_queues:
qr = queue_reader(q,self)
qr.start()
def select(list_of_queues):
outq = queue.Queue()
for q in list_of_queues:
qr = queue_reader(q,outq)
qr.start()
return outq.get()
В следующей тестовой процедуре показано, как ее использовать:
import multiq
import queue
q1 = queue.Queue()
q2 = queue.Queue()
q3 = multiq.multi_queue([q1,q2])
q1.put(1)
q2.put(2)
q1.put(3)
q1.put(4)
res=0
while not res==4:
while not q3.empty():
res = q3.get()[1]
print ("returning result =",res)
Надеюсь, что это поможет.
Тони Уоллес
Ответ 6
Новая версия кода выше...
Не уверен, насколько хорошо выбор в многопроцессорной очереди работает на окнах. Поскольку выбор в Windows прослушивает сокеты, а не файлы, я подозреваю, что могут быть проблемы.
Мой ответ - сделать поток, чтобы прослушивать каждую очередь в блокирующем режиме и помещать результаты в одну очередь, прослушанную основным потоком, по существу мультиплексируя отдельные очереди в один.
Мой код для этого:
"""
Allow multiple queues to be waited upon.
An EndOfQueueMarker marks a queue as
"all data sent on this queue".
When this marker has been accessed on
all input threads, this marker is returned
by the multi_queue.
"""
import queue
import threading
class EndOfQueueMarker:
def __str___(self):
return "End of data marker"
pass
class queue_reader(threading.Thread):
def __init__(self,inq,sharedq):
threading.Thread.__init__(self)
self.inq = inq
self.sharedq = sharedq
def run(self):
q_run = True
while q_run:
data = self.inq.get()
result = (self.inq,data)
self.sharedq.put(result)
if data is EndOfQueueMarker:
q_run = False
class multi_queue(queue.Queue):
def __init__(self,list_of_queues):
queue.Queue.__init__(self)
self.qList = list_of_queues
self.qrList = []
for q in list_of_queues:
qr = queue_reader(q,self)
qr.start()
self.qrList.append(qr)
def get(self,blocking=True,timeout=None):
res = []
while len(res)==0:
if len(self.qList)==0:
res = (self,EndOfQueueMarker)
else:
res = queue.Queue.get(self,blocking,timeout)
if res[1] is EndOfQueueMarker:
self.qList.remove(res[0])
res = []
return res
def join(self):
for qr in self.qrList:
qr.join()
def select(list_of_queues):
outq = queue.Queue()
for q in list_of_queues:
qr = queue_reader(q,outq)
qr.start()
return outq.get()
Следующий код - это моя тестовая процедура, чтобы показать, как это работает:
import multiq
import queue
q1 = queue.Queue()
q2 = queue.Queue()
q3 = multiq.multi_queue([q1,q2])
q1.put(1)
q2.put(2)
q1.put(3)
q1.put(4)
q1.put(multiq.EndOfQueueMarker)
q2.put(multiq.EndOfQueueMarker)
res=0
have_data = True
while have_data:
res = q3.get()[1]
print ("returning result =",res)
have_data = not(res==multiq.EndOfQueueMarker)
Ответ 7
Не делай этого.
Поместите заголовок сообщений и отправьте их в общую очередь. Это упрощает код и будет более чистым в целом.