Подтвердить что ты не робот

"выбрать" в нескольких очередях многопроцессорности Python?

Какой лучший способ подождать (без вращения), пока что-то не будет доступно ни в одной из двух (многопроцессорных) Queues, где оба находятся на одна и та же система?

4b9b3361

Ответ 1

Не похоже, что есть официальный способ справиться с этим. Или, по крайней мере, не на основе этого:

Вы можете попробовать что-то вроде того, что делает этот пост - доступ к файлам с файлами базового канала:

а затем используйте select.

Ответ 2

На самом деле вы можете использовать объекты multiprocessing.Queue в select.select. т.е.

que = multiprocessing.Queue()
(input,[],[]) = select.select([que._reader],[],[])

будет выбирать que только в том случае, если он готов к чтению.

Никакой документации об этом, хотя. Я читал исходный код библиотеки multiprocessing.queue(в linux обычно он похож на /usr/lib/python 2.6/multiprocessing/queue.py), чтобы узнать это.

С Queue.Queue Я не нашел никакого умного способа сделать это (и мне очень понравилось).

Ответ 3

Похоже, что использование потоков, которые пересылают входящие элементы в одну очередь, которую вы затем ждете, является практическим выбором при использовании многопроцессорности независимо от платформы.

Избегая потоков, требуется либо обработка низкоуровневых протоколов /FD, которые являются как специфичными для платформы, так и непростыми для обработки последовательно с API более высокого уровня.

Или вам понадобятся очереди с возможностью устанавливать обратные вызовы, которые, я думаю, являются надлежащим интерфейсом более высокого уровня. То есть вы должны написать что-то вроде:

  singlequeue = Queue()
  incoming_queue1.setcallback(singlequeue.put)
  incoming_queue2.setcallback(singlequeue.put)
  ...
  singlequeue.get()

Возможно, многопроцессорный пакет может вырастить этот API, но он еще не существует. Концепция хорошо работает с py.execnet, которая использует термин "канал" вместо "очередей", см. Здесь http://tinyurl.com/nmtr4w

Ответ 4

Вы можете использовать что-то вроде шаблона Observer, при этом абоненты очереди уведомляются об изменениях состояния.

В этом случае у вас может быть ваш рабочий поток, назначенный в качестве слушателя в каждой очереди, и всякий раз, когда он получает сигнал готовности, он может работать над новым элементом, в противном случае спя.

Ответ 5

Не уверен, насколько хорошо выбор в многопроцессорной очереди работает на окнах. Поскольку выбор в Windows прослушивает сокеты, а не файлы, я подозреваю, что могут быть проблемы.

Мой ответ - сделать поток, чтобы прослушивать каждую очередь в блокирующем режиме и помещать результаты в одну очередь, прослушанную основным потоком, по существу мультиплексируя отдельные очереди в один.

Мой код для этого:

"""
Allow multiple queues to be waited upon.

queue,value = multiq.select(list_of_queues)
"""
import queue
import threading

class queue_reader(threading.Thread):
    def __init__(self,inq,sharedq):
        threading.Thread.__init__(self)
        self.inq = inq
        self.sharedq = sharedq
    def run(self):
        while True:
            data = self.inq.get()
            print ("thread reads data=",data)
            result = (self.inq,data)
            self.sharedq.put(result)

class multi_queue(queue.Queue):
    def __init__(self,list_of_queues):
        queue.Queue.__init__(self)
        for q in list_of_queues:
            qr = queue_reader(q,self)
            qr.start()

def select(list_of_queues):
    outq = queue.Queue()
    for q in list_of_queues:
        qr = queue_reader(q,outq)
        qr.start()
    return outq.get()

В следующей тестовой процедуре показано, как ее использовать:

import multiq
import queue

q1 = queue.Queue()
q2 = queue.Queue()

q3 = multiq.multi_queue([q1,q2])

q1.put(1)
q2.put(2)
q1.put(3)
q1.put(4)

res=0
while not res==4:
    while not q3.empty():
        res = q3.get()[1]
        print ("returning result =",res)

Надеюсь, что это поможет.

Тони Уоллес

Ответ 6

Новая версия кода выше...

Не уверен, насколько хорошо выбор в многопроцессорной очереди работает на окнах. Поскольку выбор в Windows прослушивает сокеты, а не файлы, я подозреваю, что могут быть проблемы.

Мой ответ - сделать поток, чтобы прослушивать каждую очередь в блокирующем режиме и помещать результаты в одну очередь, прослушанную основным потоком, по существу мультиплексируя отдельные очереди в один.

Мой код для этого:

"""
Allow multiple queues to be waited upon.

An EndOfQueueMarker marks a queue as
    "all data sent on this queue".
When this marker has been accessed on
all input threads, this marker is returned
by the multi_queue.

"""
import queue
import threading

class EndOfQueueMarker:
    def __str___(self):
        return "End of data marker"
    pass

class queue_reader(threading.Thread):
    def __init__(self,inq,sharedq):
        threading.Thread.__init__(self)
        self.inq = inq
        self.sharedq = sharedq
    def run(self):
        q_run = True
        while q_run:
            data = self.inq.get()
            result = (self.inq,data)
            self.sharedq.put(result)
            if data is EndOfQueueMarker:
                q_run = False

class multi_queue(queue.Queue):
    def __init__(self,list_of_queues):
        queue.Queue.__init__(self)
        self.qList = list_of_queues
        self.qrList = []
        for q in list_of_queues:
            qr = queue_reader(q,self)
            qr.start()
            self.qrList.append(qr)
    def get(self,blocking=True,timeout=None):
        res = []
        while len(res)==0:
            if len(self.qList)==0:
                res = (self,EndOfQueueMarker)
            else:
                res = queue.Queue.get(self,blocking,timeout)
                if res[1] is EndOfQueueMarker:
                    self.qList.remove(res[0])
                    res = []
        return res

    def join(self):
        for qr in self.qrList:
            qr.join()

def select(list_of_queues):
    outq = queue.Queue()
    for q in list_of_queues:
        qr = queue_reader(q,outq)
        qr.start()
    return outq.get()

Следующий код - это моя тестовая процедура, чтобы показать, как это работает:

import multiq
import queue

q1 = queue.Queue()
q2 = queue.Queue()

q3 = multiq.multi_queue([q1,q2])

q1.put(1)
q2.put(2)
q1.put(3)
q1.put(4)
q1.put(multiq.EndOfQueueMarker)
q2.put(multiq.EndOfQueueMarker)
res=0
have_data = True
while have_data:
    res = q3.get()[1]
    print ("returning result =",res)
    have_data = not(res==multiq.EndOfQueueMarker)

Ответ 7

Не делай этого.

Поместите заголовок сообщений и отправьте их в общую очередь. Это упрощает код и будет более чистым в целом.