Подтвердить что ты не робот

Параллелизировать применять после pandas groupby

Я использовал rosetta.parallel.pandas_easy для параллелизации, применяемого после группы, например:

from rosetta.parallel.pandas_easy import groupby_to_series_to_frame
df = pd.DataFrame({'a': [6, 2, 2], 'b': [4, 5, 6]},index= ['g1', 'g1', 'g2'])
groupby_to_series_to_frame(df, np.mean, n_jobs=8, use_apply=True, by=df.index)

Однако кто-нибудь понял, как распараллелить функцию, которая возвращает фреймворк? Этот код не работает для rosetta, как и ожидалось.

def tmpFunc(df):
    df['c'] = df.a + df.b
    return df

df.groupby(df.index).apply(tmpFunc)
groupby_to_series_to_frame(df, tmpFunc, n_jobs=1, use_apply=True, by=df.index)
4b9b3361

Ответ 1

Кажется, что это работает, хотя он действительно должен быть встроен в pandas

import pandas as pd
from joblib import Parallel, delayed
import multiprocessing

def tmpFunc(df):
    df['c'] = df.a + df.b
    return df

def applyParallel(dfGrouped, func):
    retLst = Parallel(n_jobs=multiprocessing.cpu_count())(delayed(func)(group) for name, group in dfGrouped)
    return pd.concat(retLst)

if __name__ == '__main__':
    df = pd.DataFrame({'a': [6, 2, 2], 'b': [4, 5, 6]},index= ['g1', 'g1', 'g2'])
    print 'parallel version: '
    print applyParallel(df.groupby(df.index), tmpFunc)

    print 'regular version: '
    print df.groupby(df.index).apply(tmpFunc)

    print 'ideal version (does not work): '
    print df.groupby(df.index).applyParallel(tmpFunc)

Ответ 2

Ответ Ivan велик, но похоже, что его можно немного упростить, также устраняя необходимость зависеть от joblib:

from multiprocessing import Pool, cpu_count

def applyParallel(dfGrouped, func):
    with Pool(cpu_count()) as p:
        ret_list = p.map(func, [group for name, group in dfGrouped])
    return pandas.concat(ret_list)

Кстати: это не может заменить какой-либо groupby.apply(), но он будет охватывать типичные случаи: например. он должен охватывать случаи 2 и 3 в документации, в то время как вы должны получить поведение case 1, указав аргумент axis=1 до окончательного вызова pandas.concat().

Ответ 3

У меня есть хак, который я использую для получения распараллеливания в Pandas. Я разбиваю свою фреймворк на куски, помещаю каждый фрагмент в элемент списка, а затем использую параллельные биты ipython для параллельной работы в списке данных. Затем я вернул список вместе с помощью функции pandas concat.

Однако это не применимо. Это работает для меня, потому что функция, которую я хочу применить к каждому фрагменту кадра данных, занимает около минуты. И вытягивание и сбор моих данных не так долго. Таким образом, это, безусловно, куд. С этим сказал, вот пример. Я использую ноутбук Ipython, поэтому в моем коде вы увидите магию %%time:

## make some example data
import pandas as pd

np.random.seed(1)
n=10000
df = pd.DataFrame({'mygroup' : np.random.randint(1000, size=n), 
                   'data' : np.random.rand(n)})
grouped = df.groupby('mygroup')

В этом примере я собираюсь сделать "куски" на основе вышеперечисленного groupby, но это не обязательно должно быть так, как данные разбиваются на разделы. Хотя это довольно распространенная картина.

dflist = []
for name, group in grouped:
    dflist.append(group)

настроить параллельные биты

from IPython.parallel import Client
rc = Client()
lview = rc.load_balanced_view()
lview.block = True

напишите глупую функцию, применимую к нашим данным

def myFunc(inDf):
    inDf['newCol'] = inDf.data ** 10
    return inDf

теперь можно запустить код последовательно, затем параллельно. сначала:

%%time
serial_list = map(myFunc, dflist)
CPU times: user 14 s, sys: 19.9 ms, total: 14 s
Wall time: 14 s

теперь параллельна

%%time
parallel_list = lview.map(myFunc, dflist)

CPU times: user 1.46 s, sys: 86.9 ms, total: 1.54 s
Wall time: 1.56 s

то требуется всего несколько мс, чтобы объединить их обратно в один фрейм данных

%%time
combinedDf = pd.concat(parallel_list)
 CPU times: user 296 ms, sys: 5.27 ms, total: 301 ms
Wall time: 300 ms

На моем MacBook работает 6 IPython-движков, но вы можете увидеть, что он сокращает время выполнения до 2 с от 14 секунд.

Для действительно длительных стохастических симуляций я могу использовать AWS-сервер, создав кластер с StarCluster. Однако большую часть времени я распараллеливаю только 8 процессоров на моем MBP.

Ответ 4

Краткий комментарий для сопровождения ответа JD Long. Я обнаружил, что если количество групп очень велико (скажем, сотни тысяч), и ваша прикладная функция делает что-то довольно простое и быстрое, а затем разбивает ваш блок данных на куски и назначает каждому куску работнику для выполнения groupby-apply (в последовательном порядке) может быть намного быстрее, чем выполнять параллельную групповую подачу заявки, а рабочие считывают очередь, содержащую множество групп. Пример:

import pandas as pd
import numpy as np
import time
from concurrent.futures import ProcessPoolExecutor, as_completed

nrows = 15000
np.random.seed(1980)
df = pd.DataFrame({'a': np.random.permutation(np.arange(nrows))})

Итак, наш фреймворк выглядит так:

    a
0   3425
1   1016
2   8141
3   9263
4   8018

Обратите внимание, что столбец "a" имеет много групп (думаю, идентификаторы клиентов):

len(df.a.unique())
15000

Функция для работы с нашими группами:

def f1(group):
    time.sleep(0.0001)
    return group

Запустите пул:

ppe = ProcessPoolExecutor(12)
futures = []
results = []

Сделайте параллельную групповую подачу:

%%time

for name, group in df.groupby('a'):
    p = ppe.submit(f1, group)
    futures.append(p)

for future in as_completed(futures):
    r = future.result()
    results.append(r)

df_output = pd.concat(results)
del ppe

CPU times: user 18.8 s, sys: 2.15 s, total: 21 s
Wall time: 17.9 s

Теперь добавим столбец, который разбивает df на несколько меньших групп:

df['b'] = np.random.randint(0, 12, nrows)

Теперь вместо 15000 групп есть только 12:

len(df.b.unique())
12

Мы разделим наш df и применим groupby-apply на каждом фрагменте.

ppe = ProcessPoolExecutor(12)

Wrapper fun:

def f2(df):
    df.groupby('a').apply(f1)
    return df

Отправляйте каждый блок, который будет работать последовательно:

%%time

for i in df.b.unique():
    p = ppe.submit(f2, df[df.b==i])
    futures.append(p)

for future in as_completed(futures):
    r = future.result()
    results.append(r)

df_output = pd.concat(results) 

CPU times: user 11.4 s, sys: 176 ms, total: 11.5 s
Wall time: 12.4 s

Обратите внимание, что количество времени, затрачиваемого на группу, не изменилось. Скорее, что изменилось, это длина очереди, из которой считаются рабочие. Я подозреваю, что происходит то, что рабочие не могут одновременно обращаться к разделяемой памяти и постоянно возвращаются к чтению очереди, и поэтому наступают друг на друга. С более крупными кусками, чтобы работать, рабочие возвращаются реже, и поэтому эта проблема улучшается, а общее выполнение выполняется быстрее.

Ответ 5

Лично я рекомендовал бы использовать dask, согласно этой теме.

Как указал @chrisb, многопроцессорная обработка с использованием панд в python может привести к ненужным накладным расходам. Он также может работать не так хорошо, как многопоточность или даже как один поток.

Dask создан специально для мультипроцессинга.