Подтвердить что ты не робот

Общие сведения о многопроцессорности: управление общей памятью, блокировки и очереди в Python

Multiprocessing - это мощный инструмент в python, и я хочу его более глубоко понять. Я хочу знать, когда использовать регулярные Locks и Queues и когда использовать многопроцессорный Manager, чтобы делиться ими среди всех процессов.

Я придумал следующие тестовые сценарии с четырьмя различными условиями для многопроцессорности:

  • Использование пула и НЕТ Диспетчер

  • Использование пула и менеджера

  • Использование отдельных процессов и НЕТ Менеджер

  • Использование отдельных процессов и диспетчера

Работа

Все условия выполняют функцию задания the_job. the_job состоит из некоторой печати, которая закреплена блокировкой. Более того, ввод функции просто помещается в очередь (чтобы проверить, можно ли ее восстановить из очереди). Этот ввод представляет собой просто индекс idx из range(10), созданный в основном script, называемом start_scenario (показан внизу).

def the_job(args):
    """The job for multiprocessing.

    Prints some stuff secured by a lock and 
    finally puts the input into a queue.

    """
    idx = args[0]
    lock = args[1]
    queue=args[2]

    lock.acquire()
    print 'I'
    print 'was '
    print 'here '
    print '!!!!'
    print '1111'
    print 'einhundertelfzigelf\n'
    who= ' By run %d \n' % idx
    print who
    lock.release()

    queue.put(idx)

Успех условия определяется как прекрасно напоминающий ввод из очереди, см. функцию read_queue внизу.

Условия

Условие 1 и 2 довольно самоочевидны. Условие 1 включает в себя создание блокировки и очереди и передачу их в пул процессов:

def scenario_1_pool_no_manager(jobfunc, args, ncores):
    """Runs a pool of processes WITHOUT a Manager for the lock and queue.

    FAILS!

    """
    mypool = mp.Pool(ncores)
    lock = mp.Lock()
    queue = mp.Queue()

    iterator = make_iterator(args, lock, queue)

    mypool.imap(jobfunc, iterator)

    mypool.close()
    mypool.join()

    return read_queue(queue)

(Вспомогательная функция make_iterator приведена в нижней части этого сообщения.) Условия 1 терпят неудачу с RuntimeError: Lock objects should only be shared between processes through inheritance.

Условие 2 довольно похоже, но теперь блокировка и очередь находятся под наблюдением менеджера:

def scenario_2_pool_manager(jobfunc, args, ncores):
    """Runs a pool of processes WITH a Manager for the lock and queue.

    SUCCESSFUL!

    """
    mypool = mp.Pool(ncores)
    lock = mp.Manager().Lock()
    queue = mp.Manager().Queue()

    iterator = make_iterator(args, lock, queue)
    mypool.imap(jobfunc, iterator)
    mypool.close()
    mypool.join()

    return read_queue(queue)

В состоянии 3 новых процесса запускаются вручную, а блокировка и очередь создаются без менеджера:

def scenario_3_single_processes_no_manager(jobfunc, args, ncores):
    """Runs an individual process for every task WITHOUT a Manager,

    SUCCESSFUL!

    """
    lock = mp.Lock()
    queue = mp.Queue()

    iterator = make_iterator(args, lock, queue)

    do_job_single_processes(jobfunc, iterator, ncores)

    return read_queue(queue)

Условие 4 похоже, но опять же с использованием менеджера:

def scenario_4_single_processes_manager(jobfunc, args, ncores):
    """Runs an individual process for every task WITH a Manager,

    SUCCESSFUL!

    """
    lock = mp.Manager().Lock()
    queue = mp.Manager().Queue()

    iterator = make_iterator(args, lock, queue)

    do_job_single_processes(jobfunc, iterator, ncores)

    return read_queue(queue)

В обоих условиях - 3 и 4 - я начинаю новый процесс для каждой из 10 задач the_job с большинством ncores процессов работающих в одно и то же время. Это достигается со следующей вспомогательной функцией:

def do_job_single_processes(jobfunc, iterator, ncores):
    """Runs a job function by starting individual processes for every task.

    At most `ncores` processes operate at the same time

    :param jobfunc: Job to do

    :param iterator:

        Iterator over different parameter settings,
        contains a lock and a queue

    :param ncores:

        Number of processes operating at the same time

    """
    keep_running=True
    process_dict = {} # Dict containing all subprocees

    while len(process_dict)>0 or keep_running:

        terminated_procs_pids = []
        # First check if some processes did finish their job
        for pid, proc in process_dict.iteritems():

            # Remember the terminated processes
            if not proc.is_alive():
                terminated_procs_pids.append(pid)

        # And delete these from the process dict
        for terminated_proc in terminated_procs_pids:
            process_dict.pop(terminated_proc)

        # If we have less active processes than ncores and there is still
        # a job to do, add another process
        if len(process_dict) < ncores and keep_running:
            try:
                task = iterator.next()
                proc = mp.Process(target=jobfunc,
                                                   args=(task,))
                proc.start()
                process_dict[proc.pid]=proc
            except StopIteration:
                # All tasks have been started
                keep_running=False

        time.sleep(0.1)

Результат

Только условие 1 терпит неудачу (RuntimeError: Lock objects should only be shared between processes through inheritance), тогда как остальные 3 условия успешны. Я пытаюсь обвести голову вокруг этого результата.

Почему пул должен обмениваться блокировкой и очередью между всеми процессами, а отдельные процессы из условия 3 - нет?

Я знаю, что для условий пула (1 и 2) все данные из итераторов передаются через травление, тогда как в условиях одного процесса (3 и 4) все данные из итераторов передаются путем наследования от основного процесса (Я использую Linux). Я думаю, до тех пор, пока память не будет изменена из дочернего процесса, к той же памяти, к которой применяется родительский процесс, обращается (копирование на запись). Но как только кто-то говорит lock.acquire(), это должно быть изменено, а дочерние процессы используют разные блокировки, размещенные где-то еще в памяти, не так ли? Как один дочерний процесс знает, что брат активировал блокировку, которая не делится через менеджера?

Наконец, несколько связанный мой вопрос, сколько разных условий 3 и 4. У обоих есть отдельные процессы, но они различаются в использовании менеджера. Оба считается действительным кодом? Или следует избегать использования менеджера, если на самом деле нет необходимости в нем?


Полный Script

Для тех, кто просто хочет скопировать и вставить все для выполнения кода, вот полный script:

__author__ = 'Me and myself'

import multiprocessing as mp
import time

def the_job(args):
    """The job for multiprocessing.

    Prints some stuff secured by a lock and 
    finally puts the input into a queue.

    """
    idx = args[0]
    lock = args[1]
    queue=args[2]

    lock.acquire()
    print 'I'
    print 'was '
    print 'here '
    print '!!!!'
    print '1111'
    print 'einhundertelfzigelf\n'
    who= ' By run %d \n' % idx
    print who
    lock.release()

    queue.put(idx)


def read_queue(queue):
    """Turns a qeue into a normal python list."""
    results = []
    while not queue.empty():
        result = queue.get()
        results.append(result)
    return results


def make_iterator(args, lock, queue):
    """Makes an iterator over args and passes the lock an queue to each element."""
    return ((arg, lock, queue) for arg in args)


def start_scenario(scenario_number = 1):
    """Starts one of four multiprocessing scenarios.

    :param scenario_number: Index of scenario, 1 to 4

    """
    args = range(10)
    ncores = 3
    if scenario_number==1:
        result =  scenario_1_pool_no_manager(the_job, args, ncores)

    elif scenario_number==2:
        result =  scenario_2_pool_manager(the_job, args, ncores)

    elif scenario_number==3:
        result =  scenario_3_single_processes_no_manager(the_job, args, ncores)

    elif scenario_number==4:
        result =  scenario_4_single_processes_manager(the_job, args, ncores)

    if result != args:
        print 'Scenario %d fails: %s != %s' % (scenario_number, args, result)
    else:
        print 'Scenario %d successful!' % scenario_number


def scenario_1_pool_no_manager(jobfunc, args, ncores):
    """Runs a pool of processes WITHOUT a Manager for the lock and queue.

    FAILS!

    """
    mypool = mp.Pool(ncores)
    lock = mp.Lock()
    queue = mp.Queue()

    iterator = make_iterator(args, lock, queue)

    mypool.map(jobfunc, iterator)

    mypool.close()
    mypool.join()

    return read_queue(queue)


def scenario_2_pool_manager(jobfunc, args, ncores):
    """Runs a pool of processes WITH a Manager for the lock and queue.

    SUCCESSFUL!

    """
    mypool = mp.Pool(ncores)
    lock = mp.Manager().Lock()
    queue = mp.Manager().Queue()

    iterator = make_iterator(args, lock, queue)
    mypool.map(jobfunc, iterator)
    mypool.close()
    mypool.join()

    return read_queue(queue)


def scenario_3_single_processes_no_manager(jobfunc, args, ncores):
    """Runs an individual process for every task WITHOUT a Manager,

    SUCCESSFUL!

    """
    lock = mp.Lock()
    queue = mp.Queue()

    iterator = make_iterator(args, lock, queue)

    do_job_single_processes(jobfunc, iterator, ncores)

    return read_queue(queue)


def scenario_4_single_processes_manager(jobfunc, args, ncores):
    """Runs an individual process for every task WITH a Manager,

    SUCCESSFUL!

    """
    lock = mp.Manager().Lock()
    queue = mp.Manager().Queue()

    iterator = make_iterator(args, lock, queue)

    do_job_single_processes(jobfunc, iterator, ncores)

    return read_queue(queue)


def do_job_single_processes(jobfunc, iterator, ncores):
    """Runs a job function by starting individual processes for every task.

    At most `ncores` processes operate at the same time

    :param jobfunc: Job to do

    :param iterator:

        Iterator over different parameter settings,
        contains a lock and a queue

    :param ncores:

        Number of processes operating at the same time

    """
    keep_running=True
    process_dict = {} # Dict containing all subprocees

    while len(process_dict)>0 or keep_running:

        terminated_procs_pids = []
        # First check if some processes did finish their job
        for pid, proc in process_dict.iteritems():

            # Remember the terminated processes
            if not proc.is_alive():
                terminated_procs_pids.append(pid)

        # And delete these from the process dict
        for terminated_proc in terminated_procs_pids:
            process_dict.pop(terminated_proc)

        # If we have less active processes than ncores and there is still
        # a job to do, add another process
        if len(process_dict) < ncores and keep_running:
            try:
                task = iterator.next()
                proc = mp.Process(target=jobfunc,
                                                   args=(task,))
                proc.start()
                process_dict[proc.pid]=proc
            except StopIteration:
                # All tasks have been started
                keep_running=False

        time.sleep(0.1)


def main():
    """Runs 1 out of 4 different multiprocessing scenarios"""
    start_scenario(1)


if __name__ == '__main__':
    main()
4b9b3361

Ответ 1

multiprocessing.Lock реализуется с использованием объекта Semaphore, предоставляемого ОС. В Linux ребенок просто наследует дескриптор семафора от родителя через os.fork. Это не копия семафора; он фактически наследует тот же дескриптор, который имеет родитель, так же, как и дескрипторы файла. Windows, с другой стороны, не поддерживает os.fork, поэтому он должен разжечь Lock. Это делается путем создания дублирующего дескриптора для Семафора Windows, используемого внутри объекта multiprocessing.Lock, с использованием API DuplicateHandle, который гласит:

Двойной дескриптор относится к тому же объекту, что и исходный дескриптор. Поэтому любые изменения объекта отражаются через оба ручки

API DuplicateHandle позволяет вам передать права на дублированный дескриптор дочернему процессу, чтобы дочерний процесс действительно мог использовать его после его разблокировки. Создав дублированный дескриптор, принадлежащий дочернему элементу, вы можете эффективно "делить" объект блокировки.

Здесь объект семафора в multiprocessing/synchronize.py

class SemLock(object):

    def __init__(self, kind, value, maxvalue):
        sl = self._semlock = _multiprocessing.SemLock(kind, value, maxvalue)
        debug('created semlock with handle %s' % sl.handle)
        self._make_methods()

        if sys.platform != 'win32':
            def _after_fork(obj):
                obj._semlock._after_fork()
            register_after_fork(self, _after_fork)

    def _make_methods(self):
        self.acquire = self._semlock.acquire
        self.release = self._semlock.release
        self.__enter__ = self._semlock.__enter__
        self.__exit__ = self._semlock.__exit__

    def __getstate__(self):  # This is called when you try to pickle the `Lock`.
        assert_spawning(self)
        sl = self._semlock
        return (Popen.duplicate_for_child(sl.handle), sl.kind, sl.maxvalue)

    def __setstate__(self, state): # This is called when unpickling a `Lock`
        self._semlock = _multiprocessing.SemLock._rebuild(*state)
        debug('recreated blocker with handle %r' % state[0])
        self._make_methods()

Обратите внимание на вызов assert_spawning в __getstate__, который вызывается при травлении объекта. Вот как это реализовано:

#
# Check that the current thread is spawning a child process
#

def assert_spawning(self):
    if not Popen.thread_is_spawning():
        raise RuntimeError(
            '%s objects should only be shared between processes'
            ' through inheritance' % type(self).__name__
            )

Эта функция гарантирует, что вы "наследуете" Lock, вызывая thread_is_spawning. В Linux этот метод просто возвращает False:

@staticmethod
def thread_is_spawning():
    return False

Это связано с тем, что Linux не нуждается в pickle для наследования Lock, поэтому, если __getstate__ фактически вызывается в Linux, мы не должны наследовать. В Windows больше происходит:

def dump(obj, file, protocol=None):
    ForkingPickler(file, protocol).dump(obj)

class Popen(object):
    '''
    Start a subprocess to run the code of a process object
    '''
    _tls = thread._local()

    def __init__(self, process_obj):
        ...
        # send information to child
        prep_data = get_preparation_data(process_obj._name)
        to_child = os.fdopen(wfd, 'wb')
        Popen._tls.process_handle = int(hp)
        try:
            dump(prep_data, to_child, HIGHEST_PROTOCOL)
            dump(process_obj, to_child, HIGHEST_PROTOCOL)
        finally:
            del Popen._tls.process_handle
            to_child.close()


    @staticmethod
    def thread_is_spawning():
        return getattr(Popen._tls, 'process_handle', None) is not None

Здесь thread_is_spawning возвращает True, если объект Popen._tls имеет атрибут process_handle. Мы видим, что атрибут process_handle создается в __init__, тогда данные, которые мы хотим унаследовать, передаются от родителя к дочернему с помощью dump, тогда атрибут удаляется. Таким образом, thread_is_spawning будет только True во время __init__. Согласно этот поток списков рассылки python-идей, на самом деле это искусственное ограничение, добавленное для моделирования того же поведения, что и os.fork в Linux. Windows фактически может поддерживать передачу Lock в любое время, потому что DuplicateHandle может быть запущен в любое время.

Все вышеизложенное относится к объекту Queue, потому что он использует Lock внутренне.

Я бы сказал, что наследование объектов Lock предпочтительнее использовать Manager.Lock(), потому что, когда вы используете Manager.Lock, каждый отдельный вызов, который вы делаете в Lock, должен быть отправлен через IPC в Manager процесс, который будет намного медленнее, чем использование общей Lock, которая живет внутри вызывающего процесса. Оба подхода являются вполне допустимыми.

Наконец, можно передать Lock всем членам Pool без использования Manager, используя аргументы ключевого слова initializer/initargs:

lock = None
def initialize_lock(l):
   global lock
   lock = l

def scenario_1_pool_no_manager(jobfunc, args, ncores):
    """Runs a pool of processes WITHOUT a Manager for the lock and queue.

    """
    lock = mp.Lock()
    mypool = mp.Pool(ncores, initializer=initialize_lock, initargs=(lock,))
    queue = mp.Queue()

    iterator = make_iterator(args, queue)

    mypool.imap(jobfunc, iterator) # Don't pass lock. It has to be used as a global in the child. (This means `jobfunc` would need to be re-written slightly.

    mypool.close()
    mypool.join()

return read_queue(queue)

Это работает, потому что аргументы, переданные в initargs, передаются методу __init__ объектов Process, которые выполняются внутри Pool, поэтому они в конечном итоге наследуются, а не маринуются.