Подтвердить что ты не робот

Пул процессов Python не является демоническим?

Можно ли создать пул Python, который не является демоническим? Я хочу, чтобы пул мог вызывать функцию, в которой есть другой пул.

Я хочу этого, потому что демоны не могут создать процесс. В частности, это приведет к ошибке:

AssertionError: daemonic processes are not allowed to have children

Например, рассмотрим сценарий, в котором у function_a есть пул, который выполняет function_b которого есть пул, который выполняет function_c. Эта цепочка функций завершится ошибкой, потому что function_b запускается в процессе демона, а процессы демона не могут создавать процессы.

4b9b3361

Ответ 1

Класс multiprocessing.pool.Pool создает рабочие процессы в своем методе __init__, делает их демоническими и запускает их, и невозможно восстановить их атрибут daemon до False до их запуска (и впоследствии это больше не допускается). Но вы можете создать свой собственный подкласс класса multiprocesing.pool.Pool (multiprocessing.Pool - это просто функция-обертка) и заменить свой собственный подкатегорий multiprocessing.Process, который всегда не является демоном, который будет использоваться для рабочих процессов.

Вот полный пример того, как это сделать. Важными компонентами являются два класса NoDaemonProcess и MyPool вверху и для вызова pool.close() и pool.join() в вашем экземпляре MyPool в конце.

#!/usr/bin/env python
# -*- coding: UTF-8 -*-

import multiprocessing
# We must import this explicitly, it is not imported by the top-level
# multiprocessing module.
import multiprocessing.pool
import time

from random import randint


class NoDaemonProcess(multiprocessing.Process):
    # make 'daemon' attribute always return False
    def _get_daemon(self):
        return False
    def _set_daemon(self, value):
        pass
    daemon = property(_get_daemon, _set_daemon)

# We sub-class multiprocessing.pool.Pool instead of multiprocessing.Pool
# because the latter is only a wrapper function, not a proper class.
class MyPool(multiprocessing.pool.Pool):
    Process = NoDaemonProcess

def sleepawhile(t):
    print("Sleeping %i seconds..." % t)
    time.sleep(t)
    return t

def work(num_procs):
    print("Creating %i (daemon) workers and jobs in child." % num_procs)
    pool = multiprocessing.Pool(num_procs)

    result = pool.map(sleepawhile,
        [randint(1, 5) for x in range(num_procs)])

    # The following is not really needed, since the (daemon) workers of the
    # child pool are killed when the child is terminated, but it good
    # practice to cleanup after ourselves anyway.
    pool.close()
    pool.join()
    return result

def test():
    print("Creating 5 (non-daemon) workers and jobs in main process.")
    pool = MyPool(5)

    result = pool.map(work, [randint(1, 5) for x in range(5)])

    pool.close()
    pool.join()
    print(result)

if __name__ == '__main__':
    test()

Ответ 2

Многопроцессорный модуль имеет приятный интерфейс для использования пулов с процессами или потоками. В зависимости от вашего текущего варианта использования, вы можете рассмотреть возможность использования multiprocessing.pool.ThreadPool для вашего внешнего пула, что приведет к созданию потоков (которые позволяют порождать процессы изнутри), а не процессам.

Это может быть ограничено GIL, но в моем конкретном случае (я тестировал оба) время запуска процессов из внешнего Pool созданное здесь, значительно перевешивало решение с ThreadPool.


Это действительно легко поменять местами Processes для Threads. Узнайте больше о том, как использовать решение ThreadPool здесь или здесь.

Ответ 3

Проблема, с которой я столкнулась, заключалась в попытке импортировать глобальные переменные между модулями, заставляя линию ProcessPool() обрабатываться несколько раз.

globals.py

from processing             import Manager, Lock
from pathos.multiprocessing import ProcessPool
from pathos.threading       import ThreadPool

class SingletonMeta(type):
    def __new__(cls, name, bases, dict):
        dict['__deepcopy__'] = dict['__copy__'] = lambda self, *args: self
        return super(SingletonMeta, cls).__new__(cls, name, bases, dict)

    def __init__(cls, name, bases, dict):
        super(SingletonMeta, cls).__init__(name, bases, dict)
        cls.instance = None

    def __call__(cls,*args,**kw):
        if cls.instance is None:
            cls.instance = super(SingletonMeta, cls).__call__(*args, **kw)
        return cls.instance

    def __deepcopy__(self, item):
        return item.__class__.instance

class Globals(object):
    __metaclass__ = SingletonMeta
    """     
    This class is a workaround to the bug: AssertionError: daemonic processes are not allowed to have children

    The root cause is that importing this file from different modules causes this file to be reevalutated each time, 
    thus ProcessPool() gets reexecuted inside that child thread, thus causing the daemonic processes bug    
    """
    def __init__(self):
        print "%s::__init__()" % (self.__class__.__name__)
        self.shared_manager      = Manager()
        self.shared_process_pool = ProcessPool()
        self.shared_thread_pool  = ThreadPool()
        self.shared_lock         = Lock()        # BUG: Windows: global name 'lock' is not defined | doesn't affect cygwin

Затем импортируйте безопасно из другого места в свой код

from globals import Globals
Globals().shared_manager      
Globals().shared_process_pool
Globals().shared_thread_pool  
Globals().shared_lock         

Ответ 4

У меня была необходимость использовать недемонический пул в Python 3.7, и в итоге я адаптировал код, опубликованный в принятом ответе. Ниже приведен фрагмент, который создает недемонический пул:

class NoDaemonProcess(multiprocessing.Process):
    @property
    def daemon(self):
        return False

    @daemon.setter
    def daemon(self, value):
        pass


class NoDaemonContext(type(multiprocessing.get_context())):
    Process = NoDaemonProcess

# We sub-class multiprocessing.pool.Pool instead of multiprocessing.Pool
# because the latter is only a wrapper function, not a proper class.
class MyPool(multiprocessing.pool.Pool):
    def __init__(self, *args, **kwargs):
        kwargs['context'] = NoDaemonContext()
        super(MyPool, self).__init__(*args, **kwargs)

Поскольку текущая реализация multiprocessing была существенно реорганизована, чтобы основываться на контекстах, нам необходимо предоставить класс NoDaemonContext который имеет наш NoDaemonProcess качестве атрибута. MyPool будет использовать этот контекст вместо контекста по умолчанию.

Тем не менее, я должен предупредить, что у этого подхода есть как минимум 2 предостережения:

  1. Это по-прежнему зависит от деталей реализации пакета multiprocessing и поэтому может выйти из строя в любое время.
  2. Существуют веские причины, по которым multiprocessing затрудняет использование недемонических процессов, многие из которых описаны здесь. Наиболее убедительным, на мой взгляд, является:

    Что касается разрешения дочерним потокам порождать своих собственных дочерних элементов, использование подпроцесса сопряжено с риском создания небольшой армии "внуков-зомби", если родительский или дочерний потоки завершаются до завершения и возврата подпроцесса.

Ответ 5

В некоторых версиях Python решение из принятого ответа может вызывать ошибку: AssertionError: group argument must be None for now.

Здесь я нашел решение, которое может помочь:

class NonDaemonPool(multiprocessing.pool.Pool):
    def Process(self, *args, **kwds):
        proc = super(NonDaemonPool, self).Process(*args, **kwds)

        class NonDaemonProcess(proc.__class__):
            """Monkey-patch process to ensure it is never daemonized"""

            @property
            def daemon(self):
                return False

            @daemon.setter
            def daemon(self, val):
                pass

        proc.__class__ = NonDaemonProcess

        return proc