Подтвердить что ты не робот

Прочитайте большой csv в разреженный pandas dataframe в эффективном режиме памяти

Функция pandas read_csv не имеет разреженной опции. У меня есть данные csv с тонны нулей в нем (он очень хорошо сжимается, и удаление любого значения 0 уменьшает его до почти половины исходного размера).

Я попытался загрузить его в плотную матрицу сначала с помощью read_csv, а затем вызвал to_sparse, но он занимает много времени и задыхается в текстовых полях, хотя большая часть данных является плавающей точкой. Если я сначала назову pandas.get_dummies(df), чтобы преобразовать категориальные столбцы в единицы и нули, тогда вызовите to_sparse(fill_value=0), это занимает абсурдное количество времени, намного дольше, чем я ожидал бы в основном числовой таблице, содержащей 12 миллионов записей, в основном ноль. Это происходит, даже если я удаляю нули из исходного файла и вызываю to_sparse() (так что значение заливки равно NaN). Это также происходит независимо от того, прошел ли я kind='block' или kind='integer'.

Помимо создания разреженного массива данных вручную, есть ли хороший, плавный способ загрузки разреженного csv напрямую, не съедая кучу лишней памяти?


Вот некоторый код для создания образца набора данных, который имеет 3 столбца данных с плавающей запятой и один столбец текстовых данных. Примерно 85% значений float равны нулю, а общий размер CSV составляет приблизительно 300 МБ, но вы, вероятно, захотите сделать это больше, чтобы действительно проверить ограничения памяти.

np.random.seed(123)
df=pd.DataFrame( np.random.randn(10000000,3) , columns=list('xyz') )
df[ df < 1.0 ] = 0.0
df['txt'] = np.random.choice( list('abcdefghij'), size=len(df) )
df.to_csv('test.csv',index=False)

И вот простой способ прочитать его, но, надеюсь, есть лучший, более эффективный способ:

sdf = pd.read_csv( 'test.csv', dtype={'txt':'category'} ).to_sparse(fill_value=0.0)

Изменить для добавления (от JohnE):. Если возможно, предоставьте некоторую статистику относительной производительности при чтении больших CSV в вашем ответе, включая информацию о том, как вы измеряли эффективность памяти (особенно, поскольку эффективность памяти сложнее для измерения, чем время часов). В частности, обратите внимание, что лучшим ответом может быть более медленный (часовой пояс), , если он более эффективен с точки зрения памяти.

4b9b3361

Ответ 1

Я бы, вероятно, обратился к этому, используя dask для загрузки ваших данных потоковым способом. Например, вы можете создать файл данных dask следующим образом:

import dask.dataframe as ddf
data = ddf.read_csv('test.csv')

Этот объект data на самом деле ничего не сделал в этот момент; он просто содержит "рецепт" сортов для чтения фрейма данных с диска в управляемых кусках. Если вы хотите материализовать данные, вы можете вызвать compute():

df = data.compute().reset_index(drop=True)

На этом этапе у вас есть стандартный pandas dataframe (мы вызываем reset_index, потому что по умолчанию каждый раздел независимо индексируется). Результат эквивалентен тому, что вы получаете, вызывая непосредственно pd.read_csv:

df.equals(pd.read_csv('test.csv'))
# True

Преимущество dask заключается в том, что вы можете добавлять инструкции к этому "рецепту" для построения вашего фрейма данных; например, вы можете сделать каждый раздел данных разреженным следующим образом:

data = data.map_partitions(lambda part: part.to_sparse(fill_value=0))

В этот момент вызов compute() будет содержать разреженный массив:

df = data.compute().reset_index(drop=True)
type(df)
# pandas.core.sparse.frame.SparseDataFrame

Профилирование

Чтобы проверить, как подход dask сравнивается с исходным подходом pandas, сделайте некоторое профилирование строк. Я буду использовать lprun и mprun, как описано здесь (полное раскрытие: это раздел моей собственной книги).

Предполагая, что вы работаете в ноутбуке Jupyter, вы можете запустить его следующим образом:

Сначала создайте отдельный файл с основными задачами, которые мы хотим сделать:

%%file dask_load.py

import numpy as np
import pandas as pd
import dask.dataframe as ddf

def compare_loads():
    df = pd.read_csv('test.csv')
    df_sparse = df.to_sparse(fill_value=0)

    df_dask = ddf.read_csv('test.csv', blocksize=10E6)
    df_dask = df_dask.map_partitions(lambda part: part.to_sparse(fill_value=0))
    df_dask = df_dask.compute().reset_index(drop=True)

Далее сделайте линейное профилирование для времени вычисления:

%load_ext line_profiler

from dask_load import compare_loads
%lprun -f compare_loads compare_loads()

Получаю следующий результат:

Timer unit: 1e-06 s

Total time: 13.9061 s
File: /Users/jakevdp/dask_load.py
Function: compare_loads at line 6

Line #      Hits         Time  Per Hit   % Time  Line Contents
==============================================================
     6                                           def compare_loads():
     7         1      4746788 4746788.0     34.1      df = pd.read_csv('test.csv')
     8         1       769303 769303.0      5.5      df_sparse = df.to_sparse(fill_value=0)
     9                                           
    10         1        33992  33992.0      0.2      df_dask = ddf.read_csv('test.csv', blocksize=10E6)
    11         1         7848   7848.0      0.1      df_dask = df_dask.map_partitions(lambda part: part.to_sparse(fill_value=0))
    12         1      8348217 8348217.0     60.0      df_dask = df_dask.compute().reset_index(drop=True)

Мы видим, что около 60% времени тратится на вызов dask, тогда как около 40% времени тратится на вызов pandas для массива примеров выше. Это говорит нам о том, что для этой задачи dask примерно на 50% медленнее, чем pandas: этого следует ожидать, поскольку фрагментация и рекомбинация разделов данных приводит к некоторым дополнительным издержкам.

Где dask shines используется в памяти: используйте mprun для создания профиля строки за строкой:

%load_ext memory_profiler
%mprun -f compare_loads compare_loads()

Результат на моей машине таков:

Filename: /Users/jakevdp/dask_load.py

Line #    Mem usage    Increment   Line Contents
================================================
     6     70.9 MiB     70.9 MiB   def compare_loads():
     7    691.5 MiB    620.6 MiB       df = pd.read_csv('test.csv')
     8    828.8 MiB    137.3 MiB       df_sparse = df.to_sparse(fill_value=0)
     9                             
    10    806.3 MiB    -22.5 MiB       df_dask = ddf.read_csv('test.csv', blocksize=10E6)
    11    806.4 MiB      0.1 MiB       df_dask = df_dask.map_partitions(lambda part: part.to_sparse(fill_value=0))
    12    947.9 MiB    141.5 MiB       df_dask = df_dask.compute().reset_index(drop=True)

Мы видим, что окончательный размер фрейма pandas составляет около ~ 140 МБ, но pandas использует ~ 620 МБ по пути, поскольку он считывает данные во временный плотный объект.

С другой стороны, dask использует только ~ 140 Мбайт всего при загрузке массива и создании окончательного разреженного результата. В случае, если вы читаете данные, плотный размер которых сопоставим с памятью, доступной в вашей системе, у dask есть явное преимущество, несмотря на более медленное вычислительное время на 50%.


Но для работы с большими данными вы не должны останавливаться здесь. Предположительно, вы выполняете некоторые операции над вашими данными, а абстракция данных dask позволяет вам выполнять эти операции (т.е. Добавлять их в "рецепт" ), прежде чем материализовать данные. Поэтому, если вы делаете с данными арифметику, агрегации, группировку и т.д., Вам даже не нужно беспокоиться о разреженном хранилище: просто выполните эти операции с объектом dask, вызовите compute() в конце и dask позаботится о применении их в памяти эффективным способом.

Так, например, я мог вычислить max() каждого столбца, используя фрейм данных dask, без необходимости сразу загружать все это в память:

>>> data.max().compute()
x      5.38114
y      5.33796
z      5.25661
txt          j
dtype: object

Работа с файловыми кадрами dask напрямую позволит вам обойти заботы о представлении данных, потому что вам, вероятно, никогда не придется сразу загружать все данные в память.

Удачи!

Ответ 2

Здесь ответ предлагается в основном как контрольный показатель. Надеюсь, что есть лучшие способы, чем это.

chunksize = 1000000       # perhaps try some different values here?
chunks = pd.read_csv( 'test.csv', chunksize=chunksize, dtype={'txt':'category'} )
sdf = pd.concat( [ chunk.to_sparse(fill_value=0.0) for chunk in chunks ] )

Как примечания @acushner, вы можете вместо этого сделать это как выражение генератора:

sdf = pd.concat( chunk.to_sparse(fill_value=0.0) for chunk in chunks )

Похоже, что существует консенсус в отношении того, что это лучше, чем список comp, хотя в моем тестировании я не видел больших различий, но, возможно, вы могли бы с разными данными.

Я надеялся сообщить о профилировании памяти по различным методам, но изо всех сил пытался получить согласованные результаты, я подозреваю, потому что python всегда очищает память за кулисами, в результате чего к результатам добавляются случайные шумы. (В комментарии к ответу Джейка он предлагает перезапустить ядро ​​jupyter перед каждым %memit, чтобы получить более последовательные результаты, но я еще не пробовал это.)

Но я постоянно находил (используя %%memit), что chunking, прочитанный выше, и метод dask @jakevdp использовали как-то очень примерно в окрестности половины памяти как наивный метод в OP.

Подробнее о профилировании я бы рекомендовал "Профилирование и код синхронизации" в книге Джейка "Справочник по науке о питонах" и рекомендовал книгу в целом.;-)