Подтвердить что ты не робот

Объекты общей памяти в многопроцессорной обработке

Предположим, что у меня большой массив памяти numpy, у меня есть функция func, которая принимает этот гигантский массив в качестве входных данных (вместе с некоторыми другими параметрами). func с разными параметрами можно запускать параллельно. Например:

def func(arr, param):
    # do stuff to arr, param

# build array arr

pool = Pool(processes = 6)
results = [pool.apply_async(func, [arr, param]) for param in all_params]
output = [res.get() for res in results]

Если я использую библиотеку многопроцессорности, то этот гигантский массив будет копироваться несколько раз в разные процессы.

Есть ли способ разрешить различным процессам использовать один и тот же массив? Этот объект массива доступен только для чтения и никогда не будет изменен.

Что еще сложнее, если arr - это не массив, а произвольный объект python, есть ли способ поделиться им?

[Изменено]

Я прочитал ответ, но я все еще немного смущен. Поскольку fork() является копированием на запись, мы не должны вызывать никаких дополнительных затрат при возникновении новых процессов в библиотеке многопроцессорности python. Но следующий код предполагает, что есть огромные накладные расходы:

from multiprocessing import Pool, Manager
import numpy as np; 
import time

def f(arr):
    return len(arr)

t = time.time()
arr = np.arange(10000000)
print "construct array = ", time.time() - t;


pool = Pool(processes = 6)

t = time.time()
res = pool.apply_async(f, [arr,])
res.get()
print "multiprocessing overhead = ", time.time() - t;

(и, кстати, стоимость увеличивается по мере увеличения размера массива, поэтому я подозреваю, что все еще накладные расходы связаны с копированием памяти):

construct array =  0.0178790092468
multiprocessing overhead =  0.252444982529

Почему такие огромные накладные расходы, если мы не скопировали массив? А какая часть памяти хранит меня?

4b9b3361

Ответ 1

Если вы используете операционную систему, которая использует семантику fork() копирования при записи (как и любой обычный unix), то, если вы никогда не измените свою структуру данных, она будет доступна всем дочерним процессам без использования дополнительной памяти. Вам не нужно будет делать ничего особенного (кроме того, что вы должны быть абсолютно уверены, что не изменили объект).

Наиболее эффективная вещь, которую вы можете сделать для вашей проблемы, - это упаковать ваш массив в эффективную структуру массива (используя numpy или array), поместить его в общую память, обернуть его с помощью multiprocessing.Array и передать его своим функциям. Этот ответ показывает, как это сделать.

Если вам нужен доступный для записи общий объект, вам нужно обернуть его какой-то синхронизацией или блокировкой. multiprocessing обеспечивает два способа сделать это: один использует общую память (подходит для простых значений, массивов или типов) или прокси-сервер Manager, где один процесс удерживает память, а менеджер осуществляет арбитражный доступ к ней из других процессов (даже по сети),

Подход Manager может использоваться с произвольными объектами Python, но он будет медленнее, чем эквивалент с использованием разделяемой памяти, поскольку объекты должны быть сериализованы/десериализованы и отправлены между процессами.

Существует множество библиотек параллельной обработки и подходов, доступных в Python. multiprocessing - это отличная и хорошо округленная библиотека, но если у вас есть особые потребности, возможно, один из других подходов может быть лучше.

Ответ 2

Я столкнулся с той же проблемой и написал небольшой вспомогательный класс для совместной работы.

Я использую multiprocessing.RawArray (lockfree), а также доступ к массивам вообще не синхронизирован (lockfree), будьте осторожны, чтобы не выстрелить себе ногами.

Благодаря решению я получаю ускорения примерно в 3 раза на четырехъядерном процессоре i7.

Вот код: Не стесняйтесь использовать и улучшать его, и, пожалуйста, сообщайте о любых ошибках.

'''
Created on 14.05.2013

@author: martin
'''

import multiprocessing
import ctypes
import numpy as np

class SharedNumpyMemManagerError(Exception):
    pass

'''
Singleton Pattern
'''
class SharedNumpyMemManager:    

    _initSize = 1024

    _instance = None

    def __new__(cls, *args, **kwargs):
        if not cls._instance:
            cls._instance = super(SharedNumpyMemManager, cls).__new__(
                                cls, *args, **kwargs)
        return cls._instance        

    def __init__(self):
        self.lock = multiprocessing.Lock()
        self.cur = 0
        self.cnt = 0
        self.shared_arrays = [None] * SharedNumpyMemManager._initSize

    def __createArray(self, dimensions, ctype=ctypes.c_double):

        self.lock.acquire()

        # double size if necessary
        if (self.cnt >= len(self.shared_arrays)):
            self.shared_arrays = self.shared_arrays + [None] * len(self.shared_arrays)

        # next handle
        self.__getNextFreeHdl()        

        # create array in shared memory segment
        shared_array_base = multiprocessing.RawArray(ctype, np.prod(dimensions))

        # convert to numpy array vie ctypeslib
        self.shared_arrays[self.cur] = np.ctypeslib.as_array(shared_array_base)

        # do a reshape for correct dimensions            
        # Returns a masked array containing the same data, but with a new shape.
        # The result is a view on the original array
        self.shared_arrays[self.cur] = self.shared_arrays[self.cnt].reshape(dimensions)

        # update cnt
        self.cnt += 1

        self.lock.release()

        # return handle to the shared memory numpy array
        return self.cur

    def __getNextFreeHdl(self):
        orgCur = self.cur
        while self.shared_arrays[self.cur] is not None:
            self.cur = (self.cur + 1) % len(self.shared_arrays)
            if orgCur == self.cur:
                raise SharedNumpyMemManagerError('Max Number of Shared Numpy Arrays Exceeded!')

    def __freeArray(self, hdl):
        self.lock.acquire()
        # set reference to None
        if self.shared_arrays[hdl] is not None: # consider multiple calls to free
            self.shared_arrays[hdl] = None
            self.cnt -= 1
        self.lock.release()

    def __getArray(self, i):
        return self.shared_arrays[i]

    @staticmethod
    def getInstance():
        if not SharedNumpyMemManager._instance:
            SharedNumpyMemManager._instance = SharedNumpyMemManager()
        return SharedNumpyMemManager._instance

    @staticmethod
    def createArray(*args, **kwargs):
        return SharedNumpyMemManager.getInstance().__createArray(*args, **kwargs)

    @staticmethod
    def getArray(*args, **kwargs):
        return SharedNumpyMemManager.getInstance().__getArray(*args, **kwargs)

    @staticmethod    
    def freeArray(*args, **kwargs):
        return SharedNumpyMemManager.getInstance().__freeArray(*args, **kwargs)

# Init Singleton on module load
SharedNumpyMemManager.getInstance()

if __name__ == '__main__':

    import timeit

    N_PROC = 8
    INNER_LOOP = 10000
    N = 1000

    def propagate(t):
        i, shm_hdl, evidence = t
        a = SharedNumpyMemManager.getArray(shm_hdl)
        for j in range(INNER_LOOP):
            a[i] = i

    class Parallel_Dummy_PF:

        def __init__(self, N):
            self.N = N
            self.arrayHdl = SharedNumpyMemManager.createArray(self.N, ctype=ctypes.c_double)            
            self.pool = multiprocessing.Pool(processes=N_PROC)

        def update_par(self, evidence):
            self.pool.map(propagate, zip(range(self.N), [self.arrayHdl] * self.N, [evidence] * self.N))

        def update_seq(self, evidence):
            for i in range(self.N):
                propagate((i, self.arrayHdl, evidence))

        def getArray(self):
            return SharedNumpyMemManager.getArray(self.arrayHdl)

    def parallelExec():
        pf = Parallel_Dummy_PF(N)
        print(pf.getArray())
        pf.update_par(5)
        print(pf.getArray())

    def sequentialExec():
        pf = Parallel_Dummy_PF(N)
        print(pf.getArray())
        pf.update_seq(5)
        print(pf.getArray())

    t1 = timeit.Timer("sequentialExec()", "from __main__ import sequentialExec")
    t2 = timeit.Timer("parallelExec()", "from __main__ import parallelExec")

    print("Sequential: ", t1.timeit(number=1))    
    print("Parallel: ", t2.timeit(number=1))

Ответ 3

Это пример использования Ray, библиотеки для параллельного и распределенного Python. В рамках этого процесса он сериализует объекты, используя макет данных Apache Arrow (формат с нулевым копированием), и сохраняет их в хранилище объектов с общей памятью, чтобы они могли быть доступны для несколько процессов без создания копий.

Код будет выглядеть следующим образом.

import numpy as np
import ray

ray.init()

@ray.remote
def func(array, param):
    # Do stuff.
    return 1

array = np.ones(10**6)
# Store the array in the shared memory object store once
# so it is not copied multiple times.
array_id = ray.put(array)

result_ids = [func.remote(array_id, i) for i in range(4)]
output = ray.get(result_ids)

Если вы не вызовете ray.put, то массив все равно будет храниться в общей памяти, но это будет сделано один раз за вызов func, а это не то, что вам нужно.

Обратите внимание, что это будет работать не только для массивов, но и для объектов, которые содержат массивы, например словари, отображающие целые числа в массивы, как показано ниже.

Вы можете сравнить производительность сериализации в Ray и pickle, выполнив следующее в IPython.

import numpy as np
import pickle
import ray

ray.init()

x = {i: np.ones(10**7) for i in range(20)}

# Time Ray.
%time x_id = ray.put(x)  # 2.4s
%time new_x = ray.get(x_id)  # 0.00073s

# Time pickle.
%time serialized = pickle.dumps(x)  # 2.6s
%time deserialized = pickle.loads(serialized)  # 1.9s

Сериализация с Ray только немного быстрее, чем pickle, но десериализация в 1000 раз быстрее из-за использования общей памяти (это число, конечно, будет зависеть от объекта).

Смотрите документацию Ray. Вы можете прочитать больше о быстрой сериализации, используя Ray и Arrow. Заметьте, я один из разработчиков Ray.

Ответ 4

Как упоминал Роберт Нишихара, Apache Arrow делает это легко, особенно с хранилищем объектов в памяти Plasma, на котором построен Ray.

Я сделал brain-plasma специально по этой причине - быстрая загрузка и перезагрузка больших объектов в приложении Flask. Это пространство имен объектов разделяемой памяти для сериализуемых объектов Apache Arrow, включая строки тестов pickle, сгенерированные pickle.dumps(...).

Главное отличие Apache Ray и Plasma в том, что он отслеживает идентификаторы объектов для вас. Любые процессы, потоки или программы, работающие локально, могут делиться значениями переменных, вызывая имя из любого объекта brain.

$ pip install brain-plasma
$ plasma_store -m 10000000 -s /tmp/plasma

from brain_plasma import Brain
brain = Brain(path='/tmp/plasma/)

brain['a'] = [1]*10000

brain['a']
# >>> [1,1,1,1,...]