Подтвердить что ты не робот

Python pandas to_sql с sqlalchemy: как ускорить экспорт в MS SQL?

У меня есть dataframe с примерно 155 000 строк и 12 столбцов. Если я экспортирую его в csv с помощью dataframe.to_csv, вывод будет 11 МБ файлом (который создается мгновенно).

Если, однако, я экспортирую на Microsoft SQL Server с помощью метода to_sql, это занимает от 5 до 6 минут! Нет столбцов - это текст: только int, float, bool и даты. Я видел случаи, когда драйверы ODBC устанавливают nvarchar (max), и это замедляет передачу данных, но это не может быть здесь.

Любые предложения о том, как ускорить процесс экспорта? Взятие 6 минут для экспорта 11 МБ данных делает соединение ODBC практически непригодным.

Спасибо!

Мой код:

import pandas as pd
from sqlalchemy import create_engine, MetaData, Table, select
ServerName = "myserver"
Database = "mydatabase"
TableName = "mytable"

engine = create_engine('mssql+pyodbc://' + ServerName + '/' + Database)
conn = engine.connect()

metadata = MetaData(conn)

my_data_frame.to_sql(TableName,engine)
4b9b3361

Ответ 1

Метод DataFrame.to_sql генерирует инструкции insert для вашего соединителя ODBC, который затем обрабатывается соединителем ODBC как обычные вставки.

Когда это происходит медленно, это не ошибка pandas.

Сохранение вывода метода DataFrame.to_sql в файл, тогда воспроизведение этого файла по коннектору ODBC займет столько же времени.

Правильный способ массового импорта данных в базу данных - генерировать файл csv, а затем использовать команду load, которая в MS-баре данных SQL называется BULK INSERT

Например:

BULK INSERT mydatabase.myschema.mytable
FROM 'mydatadump.csv';

Ссылка на синтаксис выглядит следующим образом:

BULK INSERT 
   [ database_name . [ schema_name ] . | schema_name . ] [ table_name | view_name ] 
      FROM 'data_file' 
     [ WITH 
    ( 
   [ [ , ] BATCHSIZE = batch_size ] 
   [ [ , ] CHECK_CONSTRAINTS ] 
   [ [ , ] CODEPAGE = { 'ACP' | 'OEM' | 'RAW' | 'code_page' } ] 
   [ [ , ] DATAFILETYPE = 
      { 'char' | 'native'| 'widechar' | 'widenative' } ] 
   [ [ , ] FIELDTERMINATOR = 'field_terminator' ] 
   [ [ , ] FIRSTROW = first_row ] 
   [ [ , ] FIRE_TRIGGERS ] 
   [ [ , ] FORMATFILE = 'format_file_path' ] 
   [ [ , ] KEEPIDENTITY ] 
   [ [ , ] KEEPNULLS ] 
   [ [ , ] KILOBYTES_PER_BATCH = kilobytes_per_batch ] 
   [ [ , ] LASTROW = last_row ] 
   [ [ , ] MAXERRORS = max_errors ] 
   [ [ , ] ORDER ( { column [ ASC | DESC ] } [ ,...n ] ) ] 
   [ [ , ] ROWS_PER_BATCH = rows_per_batch ] 
   [ [ , ] ROWTERMINATOR = 'row_terminator' ] 
   [ [ , ] TABLOCK ] 
   [ [ , ] ERRORFILE = 'file_name' ] 
    )] 

Ответ 2

Вы можете использовать d6tstack, который имеет быстрые функциональные возможности pandas to SQL, потому что он использует собственные команды импорта БД. Поддерживает MS SQL, Postgres и MYSQL

uri_psql = 'postgresql+psycopg2://usr:[email protected]/db'
d6tstack.utils.pd_to_psql(df, uri_psql, 'table')
uri_mssql = 'mssql+pymssql://usr:[email protected]/db'
d6tstack.utils.pd_to_mssql(df, uri_mssql, 'table', 'schema') # experimental

Также полезно для импорта нескольких CSV с изменениями схемы данных и/или предварительной обработки пандами перед записью в базу данных, см. Далее в примерах блокнот

d6tstack.combine_csv.CombinerCSV(glob.glob('*.csv'), 
    apply_after_read=apply_fun).to_psql_combine(uri_psql, 'table')

Ответ 3

Мое решение этой проблемы ниже, если это кому-нибудь поможет. Из того, что я прочитал, метод pandas tosql загружает одну запись за раз.

Вы можете сделать массовый оператор вставки, который загружает 1000 строк и фиксирует эту транзакцию вместо фиксации одной строки каждый раз. Это значительно увеличивает скорость.

import pandas as pd
from sqlalchemy import create_engine
import pymssql
import os

connect_string  = [your connection string]
engine = create_engine(connect_string,echo=False)
connection = engine.raw_connection()
cursor = connection.cursor()

def load_data(report_name):
    # my report_name variable is also my sql server table name so I use that variable to create table name string
    sql_table_name = 'AR_'+str(report_name)
    global chunk # to QC chunks that fail for some reason
    for chunk in pd.read_csv(report_full_path_new,chunksize=1000):
        chunk.replace('\'','\'\'',inplace=True,regex=True) #replace single quotes in data with double single quotes to escape it in mysql
        chunk.fillna('NULL',inplace=True)

        my_data = str(chunk.to_records(index=False).tolist()) # convert data to string 
        my_data = my_data[1:-1] # clean up the ends
        my_data = my_data.replace('\"','\'').replace('\'NULL\'','NULL') #convert blanks to NULLS for mysql
        sql_table_name = [your sql server table name]

        sql = """
        INSERT INTO {0} 
        VALUES {1}

         """.format(sql_table_name,my_data)

        cursor.execute(sql)
        # you must call commit() to persist your data if you don't set autocommit to True
        connection.commit()

Ответ 4

У меня недавно была та же проблема, и я хочу добавить ответ на этот вопрос для других. to_sql, кажется, отправляет запрос INSERT для каждой строки, что делает его действительно медленным. Но так как 0.24.0 есть параметр method в pandas.to_sql(), где вы можете определить свою собственную функцию вставки или просто использовать method='multi', чтобы сказать пандам пропускать несколько строк в одном запросе INSERT, что делает его намного быстрее.

Обратите внимание, что ваша база данных может иметь ограничение параметра. В этом случае вы также должны определить размер фрагмента.

Таким образом, решение должно выглядеть следующим образом:

my_data_frame.to_sql(TableName, engine, chunksize=<yourParameterLimit>, method='multi')

Если вы не знаете предела параметров базы данных, попробуйте его без параметра chunksize. Он запустится или выдаст ошибку, сообщающую вам ваш лимит.

Ответ 5

На основании этого ответа - Асем.

Вы можете использовать метод copy_from для имитации массовой загрузки с объектом курсора. Это было проверено на Postgres, попробуйте с вашей БД:

import pandas as pd
from sqlalchemy import create_engine, MetaData, Table, select
from StringIO import StringIO

ServerName = "myserver"
Database = "mydatabase"
TableName = "mytable"

engine = create_engine('mssql+pyodbc://' + ServerName + '/' + Database) #don't forget to add a password if needed

my_data_frame.head(0).to_sql(TableName, engine, if_exists='replace', index=False)  # create an empty table - just for structure
conn = engine.raw_connection()
cur = conn.cursor()
output = StringIO()
my_data_frame.to_csv(output, sep='\t', header=False, index=False) # a CSV that will be used for the bulk load
output.seek(0)
cur.copy_from(output, TableName, null="")  # null values become ''
conn.commit()
conn.close()
cur.close()