Подтвердить что ты не робот

PicklingError при использовании многопроцессорности

У меня возникают проблемы при использовании Pool.map_async() (а также Pool.map()) в модуле многопроцессорности. Я реализовал функцию параллельного цикла, которая работает нормально, пока функция, вводимая в Pool.map_async, является "обычной" функцией. Когда функция является, например, метод для класса, тогда я получаю PicklingError:

cPickle.PicklingError: Can't pickle <type 'function'>: attribute lookup __builtin__.function failed

Я использую Python только для научных вычислений, поэтому я не очень хорошо знаком с концепцией травления, недавно узнал об этом. Я просмотрел пару предыдущих ответов, например Невозможно рассолить < type 'instancemethod' > при использовании многопроцессорной обработки python Pool.map(), но я не могу понять, как заставить его работать, даже когда вы следуете ссылке, приведенной в ответе.

Мой код был целью имитировать вектор Normal r.v с использованием нескольких ядер. Обратите внимание, что это всего лишь пример, и, возможно, он даже не выигрывает для работы на нескольких ядрах.

import multiprocessing as mp
import scipy as sp
import scipy.stats as spstat

def parfor(func, args, static_arg = None, nWorkers = 8, chunksize = None):
    """
    Purpose: Evaluate function using Multiple cores.

    Input:
        func       - Function to evaluate in parallel
        arg        - Array of arguments to evaluate func(arg)  
        static_arg - The "static" argument (if any), i.e. the variables that are      constant in the evaluation of func.
        nWorkers   - Number of Workers to process computations.
    Output:
        func(i, static_arg) for i in args.

    """
    # Prepare arguments for func: Collect arguments with static argument (if any)
    if static_arg != None:
        arguments = [[arg] + static_arg for arg in list(args)]
    else:
        arguments = args

    # Initialize workers
    pool = mp.Pool(processes = nWorkers) 

    # Evaluate function
    result = pool.map_async(func, arguments, chunksize = chunksize)
    pool.close()
    pool.join()

    return sp.array(result.get()).flatten() 

# First test-function. Freeze location and scale for the Normal random variates generator.
# This returns a function that is a method of the class Norm_gen. Methods cannot be pickled
# so this will give an error.
def genNorm(loc, scale):
    def subfunc(a):
        return spstat.norm.rvs(loc = loc, scale = scale, size = a)
    return subfunc

# Second test-function. The same as above but does not return a method of a class. This is a "plain" function and can be 
# pickled
def test(fargs):
    x, a, b = fargs
    return spstat.norm.rvs(size = x, loc = a, scale = b)

# Try it out.
N = 1000000

# Set arguments to function. args1 = [1, 1, 1,... ,1], the purpose is just to generate a random variable of size 1 for each 
# element in the output vector.
args1 = sp.ones(N)
static_arg = [0, 1] # standarized normal.

# This gives the PicklingError
func = genNorm(*static_arg)
sim = parfor(func, args1, static_arg = None, nWorkers = 12, chunksize = None)

# This is OK:
func = test
sim = parfor(func, args1, static_arg = static_arg, nWorkers = 12, chunksize = None)

Следуя ссылке, приведенной в ответе на вопрос в Невозможно рассолить < type 'instancemethod' > при использовании многопроцессорной обработки python Pool.map() Стивен Бетард (почти в конце) предлагает использовать модуль copy_reg. Его код:

def _pickle_method(method):
    func_name = method.im_func.__name__
    obj = method.im_self
    cls = method.im_class
    return _unpickle_method, (func_name, obj, cls)

def _unpickle_method(func_name, obj, cls):
    for cls in cls.mro():
        try:
            func = cls.__dict__[func_name]
        except KeyError:
            pass
        else:
            break
    return func.__get__(obj, cls)

import copy_reg
import types

copy_reg.pickle(types.MethodType, _pickle_method, _unpickle_method)

Я действительно не понимаю, как я могу это использовать. Единственное, что я мог придумать, это положить его прямо перед моим кодом, но это не помогло. Простым решением является, конечно, просто пойти с тем, который работает, и не вмешиваться в copy_reg. Я больше заинтересован в том, чтобы copy_reg работал правильно, чтобы полностью использовать многопроцессорность без необходимости каждый раз обходить проблему.

Спасибо за вашу помощь, это очень ценится.

Матиас

4b9b3361

Ответ 1

Проблема здесь в том, что сообщение об ошибке "pickle" меньше, чем концептуальное: multiprocess делает fork ваш код в "рабочих" разных процессах, чтобы выполнить его волшебство.

Затем он отправляет данные в и из другого процесса, плавно сериализуя и де-сериализуя данные (это та часть, которая использует рассол).

Когда часть данных, передаваемых взад и вперед, является функцией - она ​​предполагает, что функция с тем же именем существует в процессе вызова, и (я думаю) передает имя функции в виде строки. Поскольку функции не имеют состояния, вызываемый рабочий процесс просто вызывает эту же функцию с полученными данными. (Функции Python не могут быть сериализованы через рассол, так что только эта ссылка передается между мастером и рабочими процессами)

Когда ваша функция является методом в экземпляре, хотя, когда мы кодируем python, это похоже на функцию с автоматической переменной self, но она не то же самое. Потому что экземпляры (объекты) являются состояниями. Это означает, что рабочий процесс не имеет копии объекта, который является владельцем метода, который вы хотите вызвать с другой стороны.

Работа с методами передачи вашего метода как функции вызова map_async также не будет работать - поскольку многопроцессор просто использует ссылку на функцию, а не фактическую функцию при ее передаче.

Итак, вы должны (1) либо изменить свой код, чтобы вы передавали функцию, а не метод, рабочим процессам, конвертируя любые состояния, которые объект сохраняет в новые параметры, которые будут вызываться. (2) Создайте "целевую" функцию для вызова map_async, который восстанавливает необходимый объект на стороне рабочего процесса, а затем вызывает функцию внутри него. Самые простые классы в Python сами выбираются, поэтому вы можете передать объект, являющийся самим владельцем функции, на вызов map_async, а функция "target" вызовет соответствующий метод на стороне рабочего.

(2) может звучать "сложно", но, вероятно, это что-то вроде этого, если только ваш класс объектов не может быть маринован:

import types

def target(object, *args, **kw):
    method_name = args[0]
    return getattr(object, method_name)(*args[1:])
(...)    
#And add these 3 lines prior to your map_async call:


    # Evaluate function
    if isinstance (func, types.MethodType):
        arguments.insert(0, func.__name__)
        func = target
    result = pool.map_async(func, arguments, chunksize = chunksize)

* отказ от ответственности: я не тестировал этот