Эффективный способ прочитать 15 M строк CSV файлов в Python - программирование

Эффективный способ прочитать 15 M строк CSV файлов в Python

Для моего приложения мне нужно прочитать несколько файлов по 15 М строк в каждом, сохранить их в DataFrame и сохранить DataFrame в формате HDFS5.

Я уже пробовал разные подходы, в частности pandas.read_csv со спецификациями chunksize и dtype и dask.dataframe. На обработку одного файла у обоих уходит около 90 секунд, и поэтому я хотел бы знать, есть ли способ эффективно обработать эти файлы описанным способом. Далее я покажу код тестов, которые я сделал.

import pandas as pd
import dask.dataframe as dd
import numpy as np
import re 

# First approach
store = pd.HDFStore('files_DFs.h5')

chunk_size = 1e6

df_chunk = pd.read_csv(file,
                sep="\t",
                chunksize=chunk_size,
                usecols=['a', 'b'],
                converters={"a": lambda x: np.float32(re.sub(r"[^\d.]", "", x)),\
                            "b": lambda x: np.float32(re.sub(r"[^\d.]", "", x))},
                skiprows=15
           )              
chunk_list = [] 


for chunk in df_chunk:
      chunk_list.append(chunk)


df = pd.concat(chunk_list, ignore_index=True)

store[dfname] = df
store.close()

# Second approach

df = dd.read_csv(
        file,
        sep="\t",
        usecols=['a', 'b'],
        converters={"a": lambda x: np.float32(re.sub(r"[^\d.]", "", x)),\
                    "b": lambda x: np.float32(re.sub(r"[^\d.]", "", x))},
        skiprows=15
     )
store.put(dfname, df.compute())
store.close()

Вот как выглядят файлы (пробел состоит из буквальной вкладки):

a   b
599.998413  14.142895
599.998413  20.105534
599.998413  6.553850
599.998474  27.116098
599.998474  13.060312
599.998474  13.766775
599.998596  1.826706
599.998596  18.275938
599.998718  20.797491
599.998718  6.132450)
599.998718  41.646194
599.998779  19.145775
4b9b3361

Ответ 1

Ну, мои выводы не сильно связаны с пандами, а скорее с некоторыми распространенными ошибками.

Your code: 
(genel_deneme) ➜  derp time python a.py
python a.py  38.62s user 0.69s system 100% cpu 39.008 total
  1. предварительно скомпилируйте свое регулярное выражение
Replace re.sub(r"[^\d.]", "", x) with precompiled version and use it in your lambdas
Result : 
(genel_deneme) ➜  derp time python a.py 
python a.py  26.42s user 0.69s system 100% cpu 26.843 total
  1. Попробуйте найти лучший способ, чем напрямую использовать np.float32, так как он в 6-10 раз медленнее, чем вы ожидаете. Следующее не то, что вы хотите, но я просто хочу показать проблему здесь.
replace np.float32 with float and run your code. 
My Result:  
(genel_deneme) ➜  derp time python a.py
python a.py  14.79s user 0.60s system 102% cpu 15.066 total

Найдите другой способ достичь результата с помощью поплавков. Подробнее по этому вопросу fooobar.com/questions/123837/...

  1. Разделите ваш файл и работу на подпроцессы, если можете. Вы уже работаете с отдельными кусками постоянного размера. Таким образом, в основном вы можете разделить файл и обрабатывать работу в отдельных процессах, используя многопроцессорность или потоки.

Ответ 2

Во-первых, давайте ответим на заголовок вопроса

1- Как эффективно прочитать 15M строк в формате csv, содержащем числа с плавающей точкой

Я предлагаю вам использовать модин:

Генерация образца данных:

import modin.pandas as mpd
import pandas as pd
import numpy as np

frame_data = np.random.randint(0, 10_000_000, size=(15_000_000, 2)) 
pd.DataFrame(frame_data*0.0001).to_csv('15mil.csv', header=False)
!wc 15mil*.csv ; du -h 15mil*.csv

    15000000   15000000  480696661 15mil.csv
    459M    15mil.csv

Теперь к тестам:

%%timeit -r 3 -n 1 -t
global df1
df1 = pd.read_csv('15mil.csv', header=None)
    9.7 s ± 95.1 ms per loop (mean ± std. dev. of 3 runs, 1 loop each)
%%timeit -r 3 -n 1 -t
global df2
df2 = mpd.read_csv('15mil.csv', header=None)
    3.07 s ± 685 ms per loop (mean ± std. dev. of 3 runs, 1 loop each)
(df2.values == df1.values).all()
    True

Итак, как мы видим, модин был примерно в 3 раза быстрее на моей установке.


Теперь, чтобы ответить на вашу конкретную проблему

2- Очистка CSV файла, который содержит не числовые символы, и затем чтение его

Как уже отмечалось, ваше узкое место, вероятно, является конвертером. Вы называете эти лямбды 30 миллионов раз. Даже издержки вызова функции становятся нетривиальными в этом масштабе.

Пусть атакуют эту проблему.

Создание грязного набора данных:

!sed 's/.\{4\}/&)/g' 15mil.csv > 15mil_dirty.csv

подходы

Сначала я попытался использовать модин с аргументом преобразователей. Затем я попробовал другой подход, который вызывает регулярное выражение меньше раз:

Сначала я создам File-like объект, который фильтрует все через ваше регулярное выражение:

class FilterFile():
    def __init__(self, file):
        self.file = file
    def read(self, n):
        return re.sub(r"[^\d.,\n]", "", self.file.read(n))
    def write(self, *a): return self.file.write(*a) # needed to trick pandas
    def __iter__(self, *a): return self.file.__iter__(*a) # needed

Затем мы передаем его пандам в качестве первого аргумента в read_csv:

with open('15mil_dirty.csv') as file:
    df2 = pd.read_csv(FilterFile(file))

тесты:

%%timeit -r 1 -n 1 -t
global df1
df1 = pd.read_csv('15mil_dirty.csv', header=None,
        converters={0: lambda x: np.float32(re.sub(r"[^\d.]", "", x)),
                    1: lambda x: np.float32(re.sub(r"[^\d.]", "", x))}
           )
    2min 28s ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)
%%timeit -r 1 -n 1 -t
global df2
df2 = mpd.read_csv('15mil_dirty.csv', header=None,
        converters={0: lambda x: np.float32(re.sub(r"[^\d.]", "", x)),
                    1: lambda x: np.float32(re.sub(r"[^\d.]", "", x))}
           )
    38.8 s ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)
%%timeit -r 1 -n 1 -t
global df3
df3 = pd.read_csv(FilterFile(open('15mil_dirty.csv')), header=None,)
    1min ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)

Похоже, Модин снова побеждает! К сожалению, modin еще не реализовал чтение из буферов, поэтому я разработал ULTIMATE-подход:

%%timeit -r 1 -n 1 -t
with open('15mil_dirty.csv') as f, open('/dev/shm/tmp_file', 'w') as tmp:
    tmp.write(f.read().translate({ord(i):None for i in '()'}))
df4 = mpd.read_csv('/dev/shm/tmp_file', header=None)
    5.68 s ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)

Это использует translate который значительно быстрее, чем re.sub, а также использует /dev/shm который является файловой системой в памяти, которую обычно предоставляет ubuntu (и другие linux). Любой записанный там файл никогда не попадет на диск, поэтому он быстрый. Наконец, он использует modin для чтения файла, обойдя ограничение буфера modin. Этот подход примерно в 30 раз быстрее, чем ваш, и он также довольно прост.