Подтвердить что ты не робот

Как эффективно обрабатывать последовательные куски Pandas dataframe

У меня есть большой фрейм данных (несколько миллионов строк).

Я хочу иметь возможность выполнять операцию groupby на нем, а просто группировать произвольные последовательные (предпочтительно равные) подмножества строк, а не использовать какое-либо конкретное свойство отдельных строк, чтобы решить, к какой группе они идут.

Вариант использования: я хочу применить функцию к каждой строке через параллельную карту в IPython. Не имеет значения, какие строки попадают в какой-то back-end движок, поскольку функция вычисляет результат на основе одной строки за раз. (Концептуально, по крайней мере, в действительности он векторизован.)

Я придумал что-то вроде этого:

# Generate a number from 0-9 for each row, indicating which tenth of the DF it belongs to
max_idx = dataframe.index.max()
tenths = ((10 * dataframe.index) / (1 + max_idx)).astype(np.uint32)

# Use this value to perform a groupby, yielding 10 consecutive chunks
groups = [g[1] for g in dataframe.groupby(tenths)]

# Process chunks in parallel
results = dview.map_sync(my_function, groups)

Но это кажется очень длинным, и не гарантирует равных размеров кусков. Особенно, если индекс разрежен или нецелый или что-то еще.

Любые предложения для лучшего способа?

Спасибо!

4b9b3361

Ответ 1

На практике вы не можете гарантировать равные размеры блоков: количество строк может быть простым, в конце концов, в этом случае ваши единственные параметры chunking будут кусками размера 1 или одного большого фрагмента. Я склонен передавать массив в groupby. Начиная с:

>>> df = pd.DataFrame(np.random.rand(15, 5), index=[0]*15)
>>> df[0] = range(15)
>>> df
    0         1         2         3         4
0   0  0.746300  0.346277  0.220362  0.172680
0   1  0.657324  0.687169  0.384196  0.214118
0   2  0.016062  0.858784  0.236364  0.963389
[...]
0  13  0.510273  0.051608  0.230402  0.756921
0  14  0.950544  0.576539  0.642602  0.907850

[15 rows x 5 columns]

где я намеренно сделал индекс неинформативным, установив его в 0, мы просто определяем наш размер (здесь 10) и целочисленное разделение массива на него:

>>> df.groupby(np.arange(len(df))//10)
<pandas.core.groupby.DataFrameGroupBy object at 0xb208492c>
>>> for k,g in df.groupby(np.arange(len(df))//10):
...     print(k,g)
...     
0    0         1         2         3         4
0  0  0.746300  0.346277  0.220362  0.172680
0  1  0.657324  0.687169  0.384196  0.214118
0  2  0.016062  0.858784  0.236364  0.963389
[...]
0  8  0.241049  0.246149  0.241935  0.563428
0  9  0.493819  0.918858  0.193236  0.266257

[10 rows x 5 columns]
1     0         1         2         3         4
0  10  0.037693  0.370789  0.369117  0.401041
0  11  0.721843  0.862295  0.671733  0.605006
[...]
0  14  0.950544  0.576539  0.642602  0.907850

[5 rows x 5 columns]

Методы, основанные на разрезе DataFrame, могут завершиться неудачно, если индекс несовместим с этим, хотя вы всегда можете использовать .iloc[a:b] для игнорирования значений индекса и доступа к данным по положению.

Ответ 2

Используйте numpy array_split():

import numpy as np
import pandas as pd

data = pd.DataFrame(np.random.rand(10, 3))
for chunk in np.array_split(data, 5):
  assert len(chunk) == len(data) / 5

Ответ 3

Я не уверен, что это именно то, что вы хотите, но я нашел эти функции grouper на другом потоке SO довольно полезным для создания многопроцессорного пула.

Вот краткий пример из этого потока, который может сделать что-то вроде того, что вы хотите:

import numpy as np
import pandas as pds

df = pds.DataFrame(np.random.rand(14,4), columns=['a', 'b', 'c', 'd'])

def chunker(seq, size):
    return (seq[pos:pos + size] for pos in xrange(0, len(seq), size))

for i in chunker(df,5):
    print i

Что дает вам что-то вроде этого:

          a         b         c         d
0  0.860574  0.059326  0.339192  0.786399
1  0.029196  0.395613  0.524240  0.380265
2  0.235759  0.164282  0.350042  0.877004
3  0.545394  0.881960  0.994079  0.721279
4  0.584504  0.648308  0.655147  0.511390
          a         b         c         d
5  0.276160  0.982803  0.451825  0.845363
6  0.728453  0.246870  0.515770  0.343479
7  0.971947  0.278430  0.006910  0.888512
8  0.044888  0.875791  0.842361  0.890675
9  0.200563  0.246080  0.333202  0.574488
           a         b         c         d
10  0.971125  0.106790  0.274001  0.960579
11  0.722224  0.575325  0.465267  0.258976
12  0.574039  0.258625  0.469209  0.886768
13  0.915423  0.713076  0.073338  0.622967

Я надеюсь, что это поможет.

ИЗМЕНИТЬ

В этом случае я использовал эту функцию с пулом процессоров в (приблизительно) следующим образом:

from multiprocessing import Pool

nprocs = 4

pool = Pool(nprocs)

for chunk in chunker(df, nprocs):
    data = pool.map(myfunction, chunk)
    data.domorestuff()

Я предполагаю, что это должно быть очень похоже на использование распределенного механизма IPython, но я его не пробовал.

Ответ 4

Знак хорошей среды - это много вариантов, поэтому я добавлю это из Anaconda Blaze, используя Odo

import blaze as bz
import pandas as pd

df = pd.DataFrame({'col1':[1,2,3,4,5], 'col2':[2,4,6,8,10]})

for chunk in bz.odo(df, target=bz.chunks(pd.DataFrame), chunksize=2):
    # Do stuff with chunked dataframe