Подтвердить что ты не робот

Python dask DataFrame, поддержка (тривиально параллелизуемая) строка применяется?

Недавно я нашел dask модуль, который должен быть простым в использовании модулем параллельной обработки python. Большая точка продажи для меня заключается в том, что она работает с pandas.

Прочитав немного на своей странице руководства, я не могу найти способ выполнить эту тривиально параллелизуемую задачу:

ts.apply(func) # for pandas series
df.apply(func, axis = 1) # for pandas DF row apply

В настоящий момент для достижения этого в dask, AFAIK,

ddf.assign(A=lambda df: df.apply(func, axis=1)).compute() # dask DataFrame

который является уродливым синтаксисом и на самом деле медленнее, чем прямой

df.apply(func, axis = 1) # for pandas DF row apply

Любое предложение?

Изменить: Спасибо @MRocklin за функцию карты. Он выглядит медленнее обычного pandas. Является ли это связанным с выпуском GIL выпуском pandas, или я делаю это неправильно?

import dask.dataframe as dd
s = pd.Series([10000]*120)
ds = dd.from_pandas(s, npartitions = 3)

def slow_func(k):
    A = np.random.normal(size = k) # k = 10000
    s = 0
    for a in A:
        if a > 0:
            s += 1
        else:
            s -= 1
    return s

s.apply(slow_func) # 0.43 sec
ds.map(slow_func).compute() # 2.04 sec
4b9b3361

Ответ 1

map_partitions

Вы можете применить свою функцию ко всем разделам вашего фрейма данных с помощью функции map_partitions.

df.map_partitions(func, columns=...)

Обратите внимание, что func будет указывать только часть набора данных за раз, а не весь набор данных, например, с помощью pandas apply (который предположительно вам не нужен, если вы хотите сделать parallelism.)

map/apply

Вы можете сопоставить функцию по ряду последовательностей с помощью map

df.mycolumn.map(func)

Вы можете сопоставить функцию по-разному по файловому кадру с помощью apply

df.apply(func, axis=1)

Темы против процессов

Начиная с версии 0.6.0 dask.dataframes распараллеливается с потоками. Пользовательские функции Python не получат большой пользы от parallelism на основе потоков. Вместо этого вы можете попробовать процессы

df = dd.read_csv(...)

from dask.multiprocessing import get
df.map_partitions(func, columns=...).compute(get=get)

Но избегайте apply

Однако вы действительно должны избегать apply с пользовательскими функциями Python, как в Pandas, так и в Dask. Это часто является источником плохой работы. Может быть, если вы найдете способ сделать свою операцию в векторном виде, то может быть, ваш код Pandas будет на 100 раз быстрее, и вам вообще не понадобится dask.dataframe.

Рассмотрим numba

Для вашей конкретной проблемы вы можете рассмотреть numba. Это значительно улучшает вашу производительность.

In [1]: import numpy as np
In [2]: import pandas as pd
In [3]: s = pd.Series([10000]*120)

In [4]: %paste
def slow_func(k):
    A = np.random.normal(size = k) # k = 10000
    s = 0
    for a in A:
        if a > 0:
            s += 1
        else:
            s -= 1
    return s
## -- End pasted text --

In [5]: %time _ = s.apply(slow_func)
CPU times: user 345 ms, sys: 3.28 ms, total: 348 ms
Wall time: 347 ms

In [6]: import numba
In [7]: fast_func = numba.jit(slow_func)

In [8]: %time _ = s.apply(fast_func)  # First time incurs compilation overhead
CPU times: user 179 ms, sys: 0 ns, total: 179 ms
Wall time: 175 ms

In [9]: %time _ = s.apply(fast_func)  # Subsequent times are all gain
CPU times: user 68.8 ms, sys: 27 µs, total: 68.8 ms
Wall time: 68.7 ms

Отказ от ответственности, я работаю в компании, которая делает как numba, так и dask и использует многих разработчиков pandas.

Ответ 2

Как и v dask.dataframe.plply делегирует ответственность map_partitions:

@insert_meta_param_description(pad=12)
def apply(self, func, convert_dtype=True, meta=no_default, args=(), **kwds):
    """ Parallel version of pandas.Series.apply
    ...
    """
    if meta is no_default:
        msg = ("`meta` is not specified, inferred from partial data. "
               "Please provide `meta` if the result is unexpected.\n"
               "  Before: .apply(func)\n"
               "  After:  .apply(func, meta={'x': 'f8', 'y': 'f8'}) for dataframe result\n"
               "  or:     .apply(func, meta=('x', 'f8'))            for series result")
        warnings.warn(msg)

        meta = _emulate(M.apply, self._meta_nonempty, func,
                        convert_dtype=convert_dtype,
                        args=args, **kwds)

    return map_partitions(M.apply, self, func,
                          convert_dtype, args, meta=meta, **kwds)