Подтвердить что ты не робот

Каков самый чистый способ остановить работу многопроцессора python, подключенного к очереди в бесконечном цикле?

Я реализую шаблон производителя-потребителя в python, используя multiprocessing.Pool и multiprocessing.Queue. Потребители представляют собой предварительно разветвленные процессы, которые используют gevent для создания нескольких задач.

Вот отредактированная версия кода:

import gevent
from Queue import Empty as QueueEmpty
from multiprocessing import Process, Queue, Pool
import signal
import time

# Task queue
queue = Queue()

def init_worker ():
    # Ignore signals in worker
    signal.signal( signal.SIGTERM, signal.SIG_IGN )
    signal.signal( signal.SIGINT, signal.SIG_IGN )
    signal.signal( signal.SIGQUIT, signal.SIG_IGN )

# One of the worker task
def worker_task1( ):
    while True:
        try:
            m = queue.get( timeout = 2 )

            # Break out if producer says quit
            if m == 'QUIT':
                print 'TIME TO QUIT'
                break

        except QueueEmpty:
            pass

# Worker
def work( ):
    gevent.joinall([
        gevent.spawn( worker_task1 ),
    ])

pool = Pool( 2, init_worker )
for i in xrange( 2 ):
    pool.apply_async( work )

try:
    while True:
        queue.put( 'Some Task' )
        time.sleep( 2 )

except KeyboardInterrupt as e:
    print 'STOPPING'

    # Signal all workers to quit
    for i in xrange( 2 ):
        queue.put( 'QUIT' )

    pool.join()

Теперь, когда я пытаюсь выйти из него, я получаю следующее состояние:

  • Родительский процесс ждет присоединения одного из дочерних элементов.
  • Один из детей находится в состоянии невозврата. Так закончил, но родитель ждет, пока другой ребенок закончит.
  • Показывается другой ребенок: futex(0x7f99d9188000, FUTEX_WAIT, 0, NULL ....

Итак, как правильно закончить такой процесс?

4b9b3361

Ответ 1

Я понял проблему. Согласно документации для multiprocessing.Pool.join(), pool должен быть close()ed, прежде чем он может быть join()ed. Добавление pool.close() до pool.join() разрешило проблему.