Подтвердить что ты не робот

Многопроцессорность в python - совместное использование большого объекта (например, pandas dataframe) между несколькими процессами

Я использую многопроцессорность Python, точнее

from multiprocessing import Pool
p = Pool(15)

args = [(df, config1), (df, config2), ...] #list of args - df is the same object in each tuple
res = p.map_async(func, args) #func is some arbitrary function
p.close()
p.join()

Этот подход имеет огромное потребление памяти; едя почти всю мою оперативную память (в этот момент она становится очень медленной, поэтому делает многопроцессорную обработку бесполезной). Я предполагаю, что проблема заключается в том, что df - это огромный объект (большой фреймворк pandas) и он копируется для каждого процесса. Я попытался использовать multiprocessing.Value для обмена файловым кадром без копирования

shared_df = multiprocessing.Value(pandas.DataFrame, df)
args = [(shared_df, config1), (shared_df, config2), ...] 

(как предложено в общей многоадресной рассылки Python), но это дает мне TypeError: this type has no size (так же, как Обмен сложным объект между процессами Python?, к которому я, к сожалению, не понимаю ответ).

Я использую многопроцессорность в первый раз, и, возможно, мое понимание еще недостаточно. Действительно ли multiprocessing.Value даже правильная вещь в этом случае? Я видел другие предложения (например, очередь), но я немного запутался. Какие существуют варианты обмена памятью, и какой из них был бы лучше в этом случае?

4b9b3361

Ответ 1

Первый аргумент Value - typecode_or_type. Это определяется как:

typecode_or_type определяет тип возвращаемого объекта: он либо тип ctypes, либо один тип символьного типа, используемый модуль массива. * args передается конструктору для типа.

Акцент мой. Таким образом, вы просто не можете поместить pandas dataframe в Value, он должен быть тип ctypes.

Вместо этого вы можете использовать multiprocessing.Manager, чтобы обслуживать ваш экземпляр dataframe singleton ко всем вашим процессам. Там несколько разных способов оказаться в одном и том же месте - возможно, самое простое - просто перевернуть ваш файл данных в менеджер Namespace.

from multiprocessing import Manager

mgr = Manager()
ns = mgr.Namespace()
ns.df = my_dataframe

# now just give your processes access to ns, i.e. most simply
# p = Process(target=worker, args=(ns, work_unit))

Теперь ваш экземпляр dataframe доступен для любого процесса, которому передается ссылка на Менеджер. Или просто передайте ссылку на Namespace, это чище.

Одна вещь, которую я не рассматривал/не буду рассматривать, - это события и сигнализация - если вашим процессам нужно ждать, пока другие закончат выполнение, вам нужно добавить это. Вот страница с некоторыми примерами Event, которые также более подробно описывают, как использовать диспетчер Namespace.

(обратите внимание, что ничто из этого не означает, что multiprocessing приведет к ощутимым преимуществам производительности, это просто дает вам инструменты для изучения этого вопроса)

Ответ 2

Вы можете совместно использовать данные pandas между процессами без каких-либо дополнительных затрат памяти, создав дочерний процесс data_handler. Этот процесс получает вызовы от других дочерних элементов с конкретными запросами данных (т.е. Строкой, определенной ячейкой, срезом и т.д.) От вашего очень большого объекта фрейма данных. Только процесс data_handler сохраняет ваш фрейм данных в памяти, в отличие от диспетчера, подобного пространству имен, в результате которого фрейм данных копируется во все дочерние процессы. Смотрите ниже рабочий пример. Это может быть преобразовано в бассейн.

Вам нужен индикатор прогресса для этого? смотрите мой ответ здесь: fooobar.com/info/135412/...

import time
import Queue
import numpy as np
import pandas as pd
import multiprocessing
from random import randint

#==========================================================
# DATA HANDLER
#==========================================================

def data_handler( queue_c, queue_r, queue_d, n_processes ):

    # Create a big dataframe
    big_df = pd.DataFrame(np.random.randint(
        0,100,size=(100, 4)), columns=list('ABCD'))

    # Handle data requests
    finished = 0
    while finished < n_processes:

        try:
            # Get the index we sent in
            idx = queue_c.get(False)

        except Queue.Empty:
            continue
        else:
            if idx == 'finished':
                finished += 1
            else:
                try:
                    # Use the big_df here!
                    B_data = big_df.loc[ idx, 'B' ]

                    # Send back some data
                    queue_r.put(B_data)
                except:
                    pass    

# big_df may need to be deleted at the end. 
#import gc; del big_df; gc.collect()

#==========================================================
# PROCESS DATA
#==========================================================

def process_data( queue_c, queue_r, queue_d):

    data = []

    # Save computer memory with a generator
    generator = ( randint(0,x) for x in range(100) )

    for g in generator:

        """
        Lets make a request by sending
        in the index of the data we want. 
        Keep in mind you may receive another 
        child processes return call, which is
        fine if order isnt important.
        """

        #print(g)

        # Send an index value
        queue_c.put(g)

        # Handle the return call
        while True:
            try:
                return_call = queue_r.get(False)
            except Queue.Empty:
                continue
            else:
                data.append(return_call)
                break

    queue_c.put('finished')
    queue_d.put(data)   

#==========================================================
# START MULTIPROCESSING
#==========================================================

def multiprocess( n_processes ):

    combined  = []
    processes = []

    # Create queues
    queue_data = multiprocessing.Queue()
    queue_call = multiprocessing.Queue()
    queue_receive = multiprocessing.Queue()

    for process in range(n_processes): 

        if process == 0:

                # Load your data_handler once here
                p = multiprocessing.Process(target = data_handler,
                args=(queue_call, queue_receive, queue_data, n_processes))
                processes.append(p)
                p.start()

        p = multiprocessing.Process(target = process_data,
        args=(queue_call, queue_receive, queue_data))
        processes.append(p)
        p.start()

    for i in range(n_processes):
        data_list = queue_data.get()    
        combined += data_list

    for p in processes:
        p.join()    

    # Your B values
    print(combined)


if __name__ == "__main__":

    multiprocess( n_processes = 4 )