Подтвердить что ты не робот

Кто выполняет обратный вызов при использовании метода apply_async для многопроцессорного пула?

Я пытаюсь понять немного о том, что происходит за кулисами при использовании метода apply_sync для многопроцессорного пула.

Кто запускает метод обратного вызова? Это основной процесс, называемый apply_async?

Скажем, я отправляю целую группу команд apply_async с обратными вызовами, а затем продолжаю свою программу. Моя программа все еще делает что-то, когда apply_async начинает заканчиваться. Как обратный вызов запускает мой "основной процесс", в то время как основной процесс все еще занят script?

Вот пример.

import multiprocessing
import time

def callback(x):
    print '{} running callback with arg {}'.format(multiprocessing.current_process().name, x)

def func(x):
    print '{} running func with arg {}'.format(multiprocessing.current_process().name, x)
    return x

pool = multiprocessing.Pool()

args = range(20)

for a in args:
    pool.apply_async(func, (a,), callback=callback)

print '{} going to sleep for a minute'.format(multiprocessing.current_process().name)

t0 = time.time()
while time.time() - t0 < 60:
    pass

print 'Finished with the script'

Результат выглядит примерно так:

PoolWorker-1 работает func с arg 0

PoolWorker-2 работает func с arg 1

PoolWorker-3 работает func с arg 2

MainProcess собирается спать в течение минуты. Основной процесс занят.

PoolWorker-4 работает func с arg 3

PoolWorker-1 работает func с arg 4

PoolWorker-2 работает func с arg 5

PoolWorker-3 работает func с arg 6

PoolWorker-4 работает func с arg 7

MainProcess запускает обратный вызов с помощью arg 0 < - main обрабатывает обратный вызов, пока он все еще находится в цикле while.

MainProcess работает с обратным вызовом с помощью arg 1

MainProcess работает с обратным вызовом с arg 2

MainProcess работает с обратным вызовом с arg 3

MainProcess работает с обратным вызовом с arg 4

PoolWorker-1 работает func с arg 8

...

Закончено с помощью script

Как работает MainProcess с обратным вызовом, когда он находится в середине цикла while?

Это утверждение о обратном вызове в документации для multiprocessing.Pool, которое похоже на подсказку, но я этого не понимаю.

apply_async (func [, args [, kwds [, callback]]])

Вариант метода apply(), который возвращает объект результата.

Если обратный вызов указан, он должен быть вызываемым, который принимает один аргумент. Когда результат будет готов, к нему будет применен обратный вызов (если вызов не завершился). обратный вызов должен завершиться немедленно, так как иначе поток, обрабатывающий результаты, будет заблокирован.

4b9b3361

Ответ 1

В документах действительно есть подсказка:

обратный вызов должен завершиться сразу после иначе поток, который обрабатывает результаты, будет заблокирован.

Обратные вызовы обрабатываются в основном процессе, но они запускаются в отдельном потоке. Когда вы создаете Pool, он фактически создает несколько Thread объектов внутри:

class Pool(object):
    Process = Process

    def __init__(self, processes=None, initializer=None, initargs=(),
                 maxtasksperchild=None):
        self._setup_queues()
        self._taskqueue = Queue.Queue()
        self._cache = {}
        ... # stuff we don't care about
        self._worker_handler = threading.Thread(
            target=Pool._handle_workers,
            args=(self, )
            )
        self._worker_handler.daemon = True
        self._worker_handler._state = RUN 
        self._worker_handler.start()

        self._task_handler = threading.Thread(
            target=Pool._handle_tasks,
            args=(self._taskqueue, self._quick_put, self._outqueue,
                  self._pool, self._cache)
            )
        self._task_handler.daemon = True
        self._task_handler._state = RUN 
        self._task_handler.start()

        self._result_handler = threading.Thread(
            target=Pool._handle_results,
            args=(self._outqueue, self._quick_get, self._cache)
            )
        self._result_handler.daemon = True
        self._result_handler._state = RUN
        self._result_handler.start()

Интересная тема для нас - _result_handler; мы скоро поймем.

Переключение передач на секунду, когда вы запускаете apply_async, он создает объект ApplyResult внутри, чтобы управлять получением результата от дочернего элемента:

def apply_async(self, func, args=(), kwds={}, callback=None):
    assert self._state == RUN
    result = ApplyResult(self._cache, callback)
    self._taskqueue.put(([(result._job, None, func, args, kwds)], None))
    return result

class ApplyResult(object):

    def __init__(self, cache, callback):
        self._cond = threading.Condition(threading.Lock())
        self._job = job_counter.next()
        self._cache = cache
        self._ready = False
        self._callback = callback
        cache[self._job] = self


    def _set(self, i, obj):
        self._success, self._value = obj
        if self._callback and self._success:
            self._callback(self._value)
        self._cond.acquire()
        try:
            self._ready = True
            self._cond.notify()
        finally:
            self._cond.release()
        del self._cache[self._job]

Как вы можете видеть, метод _set - это тот, который завершает фактическое выполнение переданного callback, если задача выполнена успешно. Также обратите внимание, что он добавляет себя к глобальному cache dict в конце __init__.

Теперь вернемся к объекту нити _result_handler. Этот объект вызывает функцию _handle_results, которая выглядит так:

    while 1:
        try:
            task = get()
        except (IOError, EOFError):
            debug('result handler got EOFError/IOError -- exiting')
            return

        if thread._state:
            assert thread._state == TERMINATE
            debug('result handler found thread._state=TERMINATE')
            break

        if task is None:
            debug('result handler got sentinel')
            break

        job, i, obj = task
        try:
            cache[job]._set(i, obj)  # Here is _set (and therefore our callback) being called!
        except KeyError:
            pass

        # More stuff

Это цикл, который просто извлекает результаты из дочерних элементов из очереди, находит запись для него в cache и вызывает _set, который выполняет наш обратный вызов. Он способен работать, даже если вы находитесь в цикле, потому что он не работает в основном потоке.