Подтвердить что ты не робот

Многопроцессор python - процесс зависает при соединении для большой очереди

Я запускаю python 2.7.3, и я заметил следующее странное поведение. Рассмотрим этот минимальный пример:

from multiprocessing import Process, Queue

def foo(qin, qout):
    while True:
        bar = qin.get()
        if bar is None:
            break
        qout.put({'bar': bar})

if __name__ == '__main__':
    import sys

    qin = Queue()
    qout = Queue()
    worker = Process(target=foo,args=(qin,qout))
    worker.start()

    for i in range(100000):
        print i
        sys.stdout.flush()
        qin.put(i**2)

    qin.put(None)
    worker.join()

Когда я зацикливаю более 10 000 или более, мой script зависает на worker.join(). Он отлично работает, когда цикл достигает 1000.

Любые идеи?

4b9b3361

Ответ 1

Очередь qout в подпроцессе заполняется. Данные, которые вы ввели в нее из foo(), не подходят в буфере внутренних линий OS, поэтому блоки подпроцессов пытаются подобрать больше данных. Но родительский процесс не читает эти данные: он просто заблокирован, ожидая завершения подпроцесса. Это типичный тупик.

Ответ 2

Там должно быть ограничение на размер очередей. Рассмотрим следующую модификацию:

from multiprocessing import Process, Queue

def foo(qin,qout):
    while True:
        bar = qin.get()
        if bar is None:
            break
        #qout.put({'bar':bar})

if __name__=='__main__':
    import sys

    qin=Queue()
    qout=Queue()   ## POSITION 1
    for i in range(100):
        #qout=Queue()   ## POSITION 2
        worker=Process(target=foo,args=(qin,))
        worker.start()
        for j in range(1000):
            x=i*100+j
            print x
            sys.stdout.flush()
            qin.put(x**2)

        qin.put(None)
        worker.join()

    print 'Done!'

Это работает как есть (с qout.put строкой qout.put). Если вы попытаетесь сохранить все 100000 результатов, то qout станет слишком большим: если я раскомментирую qout.put({'bar':bar}) в foo и оставлю определение qout в ПОЛОЖЕНИИ 1, код зависнет. Если, однако, я qout определение qout в ПОЛОЖЕНИЕ 2, сценарий завершится.

Короче говоря, вы должны быть осторожны, чтобы ни qin ни qout становились слишком большими. (См. Также: Максимальный размер очереди многопроцессорной обработки - 32767)

Ответ 3

У меня была та же проблема на python3, когда вы пытались поместить строки в очередь общего размера около 5000 cahrs.

В моем проекте был хост-процесс, который устанавливает очередь и запускает подпроцесс, затем присоединяется. Afrer join хост-процесс читает форму очереди. Когда подпроцесс обрабатывает слишком много данных, хост висит на join. Я исправил это, используя следующую функцию для ожидания подпроцесса в хост-процессе:

def yield_from_process(q, p):
    while p.is_alive():
        p.join(timeout=1)
        while True:
            try:
                yield q.get(block=False)
            except Empty:
                break

Я читаю из очереди, как только он заполняется, поэтому он никогда не становится очень большим

Ответ 4

Я пытался .get() асинхронный работник после закрытия пула

ошибка отступа вне блока с

у меня было это

with multiprocessing.Pool() as pool:
    async_results = list()
    for job in jobs:
        async_results.append(
            pool.apply_async(
                _worker_func,
                (job,),
            )
        )
# wrong
for async_result in async_results:
    yield async_result.get()

мне это нужно

with multiprocessing.Pool() as pool:
    async_results = list()
    for job in jobs:
        async_results.append(
            pool.apply_async(
                _worker_func,
                (job,),
            )
        )
    # right
    for async_result in async_results:
        yield async_result.get()