Подтвердить что ты не робот

Ускорение pandas.DataFrame.to_sql с fast_executemany из pyODBC

Я хотел бы отправить большой pandas.DataFrame на удаленный сервер с MS SQL. То, как я делаю это сейчас, - это преобразовать объект data_frame в список кортежей и затем отправить его с помощью executemany() pyODBC executemany(). Это выглядит примерно так:

 import pyodbc as pdb

 list_of_tuples = convert_df(data_frame)

 connection = pdb.connect(cnxn_str)

 cursor = connection.cursor()
 cursor.fast_executemany = True
 cursor.executemany(sql_statement, list_of_tuples)
 connection.commit()

 cursor.close()
 connection.close()

Затем я начал задаваться вопросом, можно ли ускорить работу (или, по крайней мере, более читаемую), с помощью data_frame.to_sql(). Я придумал следующее решение:

 import sqlalchemy as sa

 engine = sa.create_engine("mssql+pyodbc:///?odbc_connect=%s" % cnxn_str)
 data_frame.to_sql(table_name, engine, index=False)

Теперь код более читабельен, но загрузка по меньшей мере в 150 раз медленнее...

Есть ли способ перевернуть fast_executemany при использовании SQLAlchemy?

Я использую pandas-0.20.3, pyODBC-4.0.21 и sqlalchemy-1.1.13.

4b9b3361

Ответ 1

После обращения к разработчикам SQLAlchemy появился способ решить эту проблему. Большое спасибо им за отличную работу!

Нужно использовать событие выполнения курсора и проверить, был ли executemany флаг executemany. Если это действительно так, fast_executemany опцию fast_executemany. Например:

from sqlalchemy import event

@event.listens_for(engine, 'before_cursor_execute')
def receive_before_cursor_execute(conn, cursor, statement, params, context, executemany):
    if executemany:
        cursor.fast_executemany = True

Более подробную информацию о событиях выполнения можно найти здесь.


ОБНОВЛЕНИЕ: Поддержка fast_executemany pyodbc была добавлена в SQLAlchemy 1.3.0, так что этот хак больше не нужен.

Ответ 2

РЕДАКТИРОВАТЬ (2019-03-08): Горд Томпсон прокомментировал ниже хорошие новости из журналов обновлений sqlalchemy: начиная с SQLAlchemy 1.3.0, выпущенного 2019-03-04, sqlalchemy теперь поддерживает engine = create_engine(sqlalchemy_url, fast_executemany=True) для диалект mssql+pyodbc. Т.е. больше нет необходимости определять функцию и использовать @event.listens_for(engine, 'before_cursor_execute') означает, что нижеследующая функция может быть удалена, и в операторе create_engine должен быть установлен только флаг - и при этом сохраняется скорость вверх.

Исходное сообщение:

Просто сделал аккаунт чтобы опубликовать это. Я хотел прокомментировать вышеупомянутую ветку, поскольку это продолжение уже предоставленного ответа. Вышеупомянутое решение работало для меня с драйвером SQL версии 17 для записи в хранилище Microsft SQL из установки на основе Ubuntu.

Полный код, который я использовал для значительного ускорения (говорим> ускорение в 100 раз), приведен ниже. Это фрагмент кода "под ключ" при условии, что вы измените строку подключения, указав соответствующие данные. На постере выше, большое спасибо за решение, так как я уже довольно долго искал это.

import pandas as pd
import numpy as np
import time
from sqlalchemy import create_engine, event
from urllib.parse import quote_plus


conn =  "DRIVER={ODBC Driver 17 for SQL Server};SERVER=IP_ADDRESS;DATABASE=DataLake;UID=USER;PWD=PASS"
quoted = quote_plus(conn)
new_con = 'mssql+pyodbc:///?odbc_connect={}'.format(quoted)
engine = create_engine(new_con)


@event.listens_for(engine, 'before_cursor_execute')
def receive_before_cursor_execute(conn, cursor, statement, params, context, executemany):
    print("FUNC call")
    if executemany:
        cursor.fast_executemany = True


table_name = 'fast_executemany_test'
df = pd.DataFrame(np.random.random((10**4, 100)))


s = time.time()
df.to_sql(table_name, engine, if_exists = 'replace', chunksize = None)
print(time.time() - s)

Основываясь на комментариях ниже, я хотел потратить некоторое время, чтобы объяснить некоторые ограничения в реализации pandas to_sql и способа обработки запроса. Есть две вещи, которые могут вызвать MemoryError.

1) Предполагается, что вы пишете в удаленное хранилище SQL. Когда вы пытаетесь написать большой DataFrame для панд с to_sql метода to_sql он преобразует весь dataframe в список значений. Это преобразование занимает намного больше оперативной памяти, чем исходный DataFrame (поверх него, так как старый DataFrame все еще присутствует в RAM). Этот список предоставляется последнему вызову executemany для вашего коннектора ODBC. Я думаю, что у соединителя ODBC есть некоторые проблемы, обрабатывающие такие большие запросы. Способ решения этой проблемы является предоставление to_sql метод а chunksize аргумент (10 ** 5, кажется, вокруг оптимальной давая около 600 Мбит/с() скорость записи на 2 CPU 7GB барана приложения MSSQL Storage от Azure - не может рекомендую Azure кстати). Таким образом, первое ограничение - размер запроса - можно обойти, предоставив аргумент chunksize. Однако это не позволит вам записать фрейм данных размером 10 ** 7 или больше (по крайней мере, не на той виртуальной машине, с которой я работаю, у которой ~ 55 ГБ ОЗУ), поскольку это номер 2.

Этого можно обойти, разбив DataFrame с помощью np.split (составляющего 10 ** 6 блоков DataFrame размера). Они могут быть записаны итеративно. Я попытаюсь сделать пулл-запрос, когда у меня будет готово решение для метода to_sql в ядре самой панды, так что вам не придется делать это предварительно каждый раз. В любом случае я написал функцию, похожую (не под ключ) на следующую:

import pandas as pd
import numpy as np

def write_df_to_sql(df, **kwargs):
    chunks = np.split(df, df.shape()[0] / 10**6)
    for chunk in chunks:
        chunk.to_sql(**kwargs)
    return True

Более полный пример приведенного выше фрагмента можно посмотреть здесь: https://gitlab.com/timelord/timelord/blob/master/timelord/utils/connector.py

Это класс, который я написал, который включает в себя патч и облегчает некоторые необходимые накладные расходы, связанные с настройкой соединений с SQL. Еще нужно написать какую-то документацию. Также я планировал добавить патч для самой панды, но пока не нашел хорошего способа сделать это.

Надеюсь, это поможет.

Ответ 3

Я просто хотел опубликовать этот полный пример в качестве дополнительного высокопроизводительного варианта для тех, кто может использовать новую библиотеку turbodbc: http://turbodbc.readthedocs.io/en/latest/

Ясно, что существует множество параметров между потоками между пандами .to_sql(), запускающих fast_executemany через sqlalchemy, непосредственного использования pyodbc с кортежами /lists/etc. Или даже попыткой BULK UPLOAD с плоскими файлами.

Надеемся, что следующее может сделать жизнь немного приятнее, так как функциональность развивается в текущем проекте Pandas или включает что-то вроде интеграции с Turbodbc в будущем.

import pandas as pd
import numpy as np
from turbodbc import connect, make_options
from io import StringIO

test_data = '''id,transaction_dt,units,measures
               1,2018-01-01,4,30.5
               1,2018-01-03,4,26.3
               2,2018-01-01,3,12.7
               2,2018-01-03,3,8.8'''

df_test = pd.read_csv(StringIO(test_data), sep=',')
df_test['transaction_dt'] = pd.to_datetime(df_test['transaction_dt'])

options = make_options(parameter_sets_to_buffer=1000)
conn = connect(driver='{SQL Server}', server='server_nm', database='db_nm', turbodbc_options=options)

test_query = '''DROP TABLE IF EXISTS [db_name].[schema].[test]

                CREATE TABLE [db_name].[schema].[test]
                (
                    id int NULL,
                    transaction_dt datetime NULL,
                    units int NULL,
                    measures float NULL
                )

                INSERT INTO [db_name].[schema].[test] (id,transaction_dt,units,measures)
                VALUES (?,?,?,?) '''

cursor.executemanycolumns(test_query, [df_test['id'].values, df_test['transaction_dt'].values, df_test['units'].values, df_test['measures'].values]

turbodbc должен быть ОЧЕНЬ быстрым во многих случаях использования (особенно с массивами numpy). Пожалуйста, обратите внимание, насколько просто передать лежащие в основе массивы из столбцов данных в качестве параметров непосредственно в запрос. Я также считаю, что это помогает предотвратить создание промежуточных объектов, которые чрезмерно увеличивают потребление памяти. Надеюсь, что это полезно!

Ответ 4

Я столкнулся с той же проблемой, но с использованием PostgreSQL. Теперь они просто выпускают версию 0.24.0 для панд, и в функции to_sql появился новый параметр method который решил мою проблему.

from sqlalchemy import create_engine

engine = create_engine(your_options)
data_frame.to_sql(table_name, engine, method="multi")

Скорость загрузки в 100 раз выше для меня. Я также рекомендую установить параметр chunksize если вы собираетесь отправлять много данных.

Ответ 5

Похоже, что Pandas 0.23.0 и 0.24.0 используют вставки нескольких значений с PyODBC, что не позволяет быстрому выполнению помочь - один оператор INSERT... VALUES... испускается для каждого чанка. Чанки вставки с несколькими значениями являются улучшением по сравнению со старым по умолчанию медленным выполнением, но, по крайней мере, в простых тестах метод быстрого выполнения все еще преобладает, не говоря уже о том, что нет необходимости в ручных вычислениях chunksize, как это требуется для вставок с несколькими значениями. Принудительное старое поведение может быть выполнено с помощью monkeypatching, если в будущем не будет предоставлена опция конфигурации:

import pandas.io.sql

def insert_statement(self, data, conn):
    return self.table.insert(), data

pandas.io.sql.SQLTable.insert_statement = insert_statement

Будущее уже здесь, и, по крайней мере, в master ветке метод вставки можно контролировать с помощью ключевого слова аргумент method= of to_sql(). По умолчанию None, что вызывает метод executemany. Передача method='multi' приводит к использованию вставки с несколькими значениями. Он может даже использоваться для реализации конкретных подходов к СУБД, таких как Postgresql COPY.

Ответ 6

Как указано @Pylander

На сегодняшний день Turbodbc - лучший выбор для приема данных!

Я так обрадовался этому, что написал "блог" на своем github и medium: пожалуйста, проверьте https://medium.com/@erickfis/etl-process-with-turbodbc-1d19ed71510e

за рабочий пример и сравнение с pandas.to_sql

Короче,

с turbodbc у меня 10000 строк (77 столбцов) за 3 секунды

с pandas.to_sql я получил те же 10000 строк (77 столбцов) за 198 секунд...

И вот что я делаю в деталях

Импорт:

import sqlalchemy
import pandas as pd
import numpy as np
import turbodbc
import time

Загрузите и обработайте некоторые данные - замените мой sample.pkl своим:

df = pd.read_pickle('sample.pkl')

df.columns = df.columns.str.strip()  # remove white spaces around column names
df = df.applymap(str.strip) # remove white spaces around values
df = df.replace('', np.nan)  # map nans, to drop NAs rows and columns later
df = df.dropna(how='all', axis=0)  # remove rows containing only NAs
df = df.dropna(how='all', axis=1)  # remove columns containing only NAs
df = df.replace(np.nan, 'NA')  # turbodbc hates null values...

Создайте таблицу с помощью sqlAlchemy

К сожалению, turbodbc требует больших накладных расходов с большим количеством ручного труда sql, для создания таблиц и для вставки данных в него.

К счастью, Python является чистой радостью, и мы можем автоматизировать этот процесс написания кода SQL.

Первым шагом является создание таблицы, которая будет получать наши данные. Однако создание таблицы с ручным написанием кода SQL может быть проблематичным, если в вашей таблице более нескольких столбцов. В моем случае очень часто таблицы имеют 240 столбцов!

В этом нам могут помочь sqlAlchemy и pandas: pandas плохо подходит для записи большого количества строк (в нашем примере это 10000), но как насчет всего 6 строк, заголовка таблицы? Таким образом, мы автоматизируем процесс создания таблиц.

Создать sqlAlchemy соединение:

mydb = 'someDB'

def make_con(db):
    """Connect to a specified db."""
    database_connection = sqlalchemy.create_engine(
        'mssql+pymssql://{0}:{1}@{2}/{3}'.format(
            myuser, mypassword,
            myhost, db
            )
        )
    return database_connection

pd_connection = make_con(mydb)

Создать таблицу на SQL Server

Использование pandas + sqlAlchemy, но только для подготовки комнаты для turbodbc, как упоминалось ранее. Обратите внимание, что здесь df.head(): мы используем pandas + sqlAlchemy для вставки только 6 строк наших данных. Это будет выполняться довольно быстро и делается для автоматизации создания таблицы.

table = 'testing'
df.head().to_sql(table, con=pd_connection, index=False)

Теперь, когда стол уже на месте, давайте серьезно.

Turbodbc соединение:

def turbo_conn(mydb):
    """Connect to a specified db - turbo."""
    database_connection = turbodbc.connect(
                                            driver='ODBC Driver 17 for SQL Server',
                                            server=myhost,
                                            database=mydb,
                                            uid=myuser,
                                            pwd=mypassword
                                        )
    return database_connection

Подготовка sql команд и данных для turbodbc. Позволяет автоматизировать создание этого кода, будучи креативным:

def turbo_write(mydb, df, table):
    """Use turbodbc to insert data into sql."""
    start = time.time()
    # preparing columns
    colunas = '('
    colunas += ', '.join(df.columns)
    colunas += ')'

    # preparing value place holders
    val_place_holder = ['?' for col in df.columns]
    sql_val = '('
    sql_val += ', '.join(val_place_holder)
    sql_val += ')'

    # writing sql query for turbodbc
    sql = f"""
    INSERT INTO {mydb}.dbo.{table} {colunas}
    VALUES {sql_val}
    """

    # writing array of values for turbodbc
    valores_df = [df[col].values for col in df.columns]

    # cleans the previous head insert
    with connection.cursor() as cursor:
        cursor.execute(f"delete from {mydb}.dbo.{table}")
        connection.commit()

    # inserts data, for real
    with connection.cursor() as cursor:
        try:
            cursor.executemanycolumns(sql, valores_df)
            connection.commit()
        except Exception:
            connection.rollback()
            print('something went wrong')

    stop = time.time() - start
    return print(f'finished in {stop} seconds')

Запись данных с использованием turbodbc - Ive получил 10000 строк (77 столбцов) за 3 секунды:

turbo_write(mydb, df.sample(10000), table)

Сравнение метода Панд - у меня те же 10000 строк (77 столбцов) за 198 секунд...

table = 'pd_testing'

def pandas_comparisson(df, table):
    """Load data using pandas."""
    start = time.time()
    df.to_sql(table, con=pd_connection, index=False)
    stop = time.time() - start
    return print(f'finished in {stop} seconds')

pandas_comparisson(df.sample(10000), table)

Окружающая среда и условия

Python 3.6.7 :: Anaconda, Inc.
TURBODBC version ‘3.0.0
sqlAlchemy version ‘1.2.12
pandas version ‘0.23.4
Microsoft SQL Server 2014
user with bulk operations privileges

Пожалуйста, проверьте https://erickfis.github.io/loose-code/ для обновлений в этом коде!

Ответ 7

Производительность SQL Server INSERT: pyodbc против turbodbc

При использовании to_sql для загрузки DataFrame pandas на SQL Server, turbodbc определенно будет быстрее, чем pyodbc без fast_executemany. Однако при включенном fast_executemany для pyodbc оба подхода дают практически одинаковую производительность.

Тестовые среды:

[Venv1_pyodbc]
pyodbc 2.0.25

[Venv2_turbodbc]
turbodbc 3.0.0
sqlalchemy-turbodbc 0.1.0

[общий для обоих]
Python 3.6.4 64-битный на Windows
SQLAlchemy 1.3.0b1
Панды 0.23.4
numpy 1.15.4

Тестовый код:

# for pyodbc
engine = create_engine('mssql+pyodbc://sa:[email protected]_panorama', fast_executemany=True)
# for turbodbc
# engine = create_engine('mssql+turbodbc://sa:[email protected]_panorama')

# test data
num_rows = 10000
num_cols = 100
df = pd.DataFrame(
    [[f'row{x:04}col{y:03}' for y in range(num_cols)] for x in range(num_rows)],
    columns=[f'col{y:03}' for y in range(num_cols)]
)

t0 = time.time()
df.to_sql("sqlalchemy_test", engine, if_exists='replace', index=None)
print(f"pandas wrote {num_rows} rows in {(time.time() - t0):0.1f} seconds")

Тесты проводились двенадцать (12) раз для каждой среды, отбрасывая лучшие и худшие времена для каждой среды. Результаты (в секундах):

   rank  pyodbc  turbodbc
   ----  ------  --------
      1    22.8      27.5
      2    23.4      28.1
      3    24.6      28.2
      4    25.2      28.5
      5    25.7      29.3
      6    26.9      29.9
      7    27.0      31.4
      8    30.1      32.1
      9    33.6      32.5
     10    39.8      32.9
   ----  ------  --------
average    27.9      30.0

Ответ 8

Просто хотел добавить в ответ @JK.

Если вы используете этот подход:

@event.listens_for(engine, 'before_cursor_execute')
def receive_before_cursor_execute(conn, cursor, statement, params, context, executemany):
    if executemany:
        cursor.fast_executemany = True

И вы получаете эту ошибку:

"sqlalchemy.exc.DBAPIError: (pyodbc.Error) ('HY010', '[HY010] [Microsoft] [собственный клиент SQL Server 11.0] Ошибка последовательности функций (0) (SQLParamData)') [SQL: 'INSERT INTO... (...) VALUES (?,?) '] [Parameters: ((...,...), (...,...)] (Справочная информация об этой ошибке по адресу: http://sqlalche.me/e/dbapi) "

Кодируйте строковые значения, например, так: 'yourStringValue'.encode('ascii')

Это решит вашу проблему.