如何使用 pyodbc 加快批量插入到 MS SQL Server 的速度

How to speed up bulk insert to MS SQL Server using pyodbc

提问人:TangoAlee 提问时间:4/15/2015 最后编辑:Gord ThompsonTangoAlee 更新时间:10/22/2023 访问量:106531

问:

下面是我的代码,我需要一些帮助。 我必须在 1,300,000 行上运行它,这意味着插入 ~300,000 行最多需要 40 分钟

我认为批量插入是加快速度的途径吗? 还是因为我正在通过 portion 遍历行?for data in reader:

#Opens the prepped csv file
with open (os.path.join(newpath,outfile), 'r') as f:
    #hooks csv reader to file
    reader = csv.reader(f)
    #pulls out the columns (which match the SQL table)
    columns = next(reader)
    #trims any extra spaces
    columns = [x.strip(' ') for x in columns]
    #starts SQL statement
    query = 'bulk insert into SpikeData123({0}) values ({1})'
    #puts column names in SQL query 'query'
    query = query.format(','.join(columns), ','.join('?' * len(columns)))

    print 'Query is: %s' % query
    #starts curser from cnxn (which works)
    cursor = cnxn.cursor()
    #uploads everything by row
    for data in reader:
        cursor.execute(query, data)
        cursor.commit()

我故意动态选择我的列标题(因为我想创建尽可能多的 pythonic 代码)。

SpikeData123 是表名。

python sql-server csv pyodbc bulkinsert

评论

0赞 zulqarnain 4/15/2015
一旦你知道你的代码工作正常,删除打印,它应该会让它更快。
0赞 Itsme2003 8/5/2020
这不是一个完整的答案,所以我把它留作评论。您可能需要尝试在加载数据时关闭表上的任何索引,然后在插入完成后重新索引表(如果可以的话)。它可能会产生相当大的差异,也可能不会,但尝试不会花很长时间。
0赞 kafran 11/25/2020
我认为使用 Python 批量插入的最佳方法是使用 cTDS 库,如 stackoverflow.com/a/64992905/3147247

答:

1赞 Michael Moura 4/15/2015 #1

是的,批量插入是将大文件加载到数据库中的正确路径。乍一看,我会说它需要这么长时间的原因是,正如您提到的,您正在遍历文件中的每一行数据,这实际上意味着消除了使用批量插入的好处,并使其像普通插入一样。请记住,顾名思义,它用于插入大量数据。 我会删除循环并重试。

此外,我会仔细检查您的批量插入语法,因为它对我来说看起来不正确。检查 pyodbc 生成的 sql,因为我感觉它可能只是在执行正常的插入

或者,如果它仍然很慢,我会尝试直接从 sql 使用批量插入,然后使用批量插入将整个文件加载到临时表中,然后将相关列插入到正确的表中。或者混合使用大容量插入和 bcp 来插入特定列或 OPENROWSET。

52赞 Gord Thompson 4/15/2015 #2

更新 - 2022 年 5 月:bcpandasbcpyaz 是 Microsoft 实用程序的包装器。bcp


更新 - 2019 年 4 月:如@SimonLang的评论中所述,在 SQL Server 2017 及更高版本下,显然支持 CSV 文件中的文本限定符(参考:此处)。BULK INSERT


几乎可以肯定,BULK INSERT 比逐行读取源文件并对每一行执行常规 INSERT 要快得多。但是,BULK INSERT 和 BCP 在 CSV 文件方面都有很大的限制,因为它们无法处理文本限定符(参考:此处)。也就是说,如果您的 CSV 文件中没有限定的文本字符串......

1,Gord Thompson,2015-04-15
2,Bob Loblaw,2015-04-07

...那么你可以批量插入它,但如果它包含文本限定符(因为某些文本值包含逗号)......

1,"Thompson, Gord",2015-04-15
2,"Loblaw, Bob",2015-04-07

...则 BULK INSERT 无法处理它。不过,将这样的 CSV 文件预处理为管道分隔的文件总体上可能会更快......

1|Thompson, Gord|2015-04-15
2|Loblaw, Bob|2015-04-07

...或制表符分隔的文件(其中表示制表符)...

1→Thompson, Gord→2015-04-15
2→Loblaw, Bob→2015-04-07

...然后批量插入该文件。对于后一个(制表符分隔)文件,BULK INSERT 代码如下所示:

import pypyodbc
conn_str = "DSN=myDb_SQLEXPRESS;"
cnxn = pypyodbc.connect(conn_str)
crsr = cnxn.cursor()
sql = """
BULK INSERT myDb.dbo.SpikeData123
FROM 'C:\\__tmp\\biTest.txt' WITH (
    FIELDTERMINATOR='\\t',
    ROWTERMINATOR='\\n'
    );
"""
crsr.execute(sql)
cnxn.commit()
crsr.close()
cnxn.close()

注意:如注释中所述,仅当 SQL Server 实例可以直接读取源文件时,执行语句才适用。有关源文件位于远程客户端上的情况,请参阅此答案BULK INSERT

评论

1赞 TangoAlee 4/16/2015
谢谢Gord!我需要一些后续帮助,但我想说谢谢!
12赞 Gabor 9/20/2017
我知道这是一篇旧文章,但此解决方案仅在文件与 SQL Server 位于同一服务器(或 SQL Server 的服务用户能够看到的位置)时才有效。因此,如果文件驻留在我的工作站上,而 SQL Server 位于其他地方,则此解决方案将不起作用
1赞 Gord Thompson 11/1/2017
@Gabor - 好点子。有关替代方法,请参阅此答案
0赞 Gabor 11/2/2017
好。你知道它是否也与sqlalchemy相同吗?(因为在幕后它使用 pyodbc,对我来说答案是肯定的,但你永远不知道...:-))
1赞 Simon Lang 4/11/2018
自 SQL Server 2017 以来,根据 RFC 4180 支持有效的 CSV。请参阅 learn.microsoft.com/en-us/sql/t-sql/statements/...BULK INSERT
69赞 Gord Thompson 11/1/2017 #3

如对另一个答案的注释中所述,仅当要导入的文件与 SQL Server 实例位于同一台计算机上或位于 SQL Server 实例可以读取的 SMB/CIFS 网络位置时,T-SQL 命令才有效。因此,如果源文件位于远程客户端上,则它可能不适用。BULK INSERT

pyodbc 4.0.19 添加了 Cursor#fast_executemany 功能,在这种情况下可能会有所帮助。 默认为“关闭”,以下测试代码...fast_executemany

cnxn = pyodbc.connect(conn_str, autocommit=True)
crsr = cnxn.cursor()
crsr.execute("TRUNCATE TABLE fast_executemany_test")

sql = "INSERT INTO fast_executemany_test (txtcol) VALUES (?)"
params = [(f'txt{i:06d}',) for i in range(1000)]
t0 = time.perf_counter()
crsr.executemany(sql, params)
print(f'{time.perf_counter() - t0:.1f} seconds')

...在我的测试机器上执行大约需要 22 秒。只需添加...crsr.fast_executemany = True

cnxn = pyodbc.connect(conn_str, autocommit=True)
crsr = cnxn.cursor()
crsr.execute("TRUNCATE TABLE fast_executemany_test")

crsr.fast_executemany = True  # new in pyodbc 4.0.19

sql = "INSERT INTO fast_executemany_test (txtcol) VALUES (?)"
params = [(f'txt{i:06d}',) for i in range(1000)]
t0 = time.perf_counter()
crsr.executemany(sql, params)
print(f'{time.perf_counter() - t0:.1f} seconds')

...将执行时间缩短到刚刚超过 1 秒。

评论

6赞 OverflowingTheGlass 12/14/2017
您将如何使用这种方法插入?我尝试作为SQL查询的一部分,但没有用。另外,或文件在您的答案中实际会在哪里?DataFramedf.values.tolist()VALUES.txt..csv
1赞 Gord Thompson 12/14/2017
@CameronTaylor (1) re: DataFrame - 您可能需要将值从对象转换为本机 Python 类型,如本答案所示。(2) re: CSV 文件位置 - 它需要是 Python 应用程序可以读取的位置。从那里,您将信息提取到内存中,创建一个元组列表,然后调用 .numpy.executemany
4赞 Gord Thompson 2/16/2018
@CameronTaylor - 有关与 pandas 一起使用的详细信息,请参阅此答案(通过 SQLAlchemy)。fast_executemany
1赞 Bryan Bailliache 10/12/2018
请注意,pyodbc 4.0.24 的 varchar(max) fast_executemany和列存在问题,导致错误:hy010。请参阅 github.com/mkleehammer/pyodbc/issues/371 归结为将 varchar(max) 更改为 varchar(4000) 为我修复它。
0赞 Umar.H 1/22/2019
Gord你这个可爱的人,谢谢你。在测试我们的产品 dB 之前,我正在本地计算机上测试写入 dB ms SQL 服务器 dB。我花了 8 分钟将 4 列 25k 行(仅限 int)写入我的机器上使用 sql alchemy 的 a dB!
1赞 Randy Stegner Sr. 3/24/2021 #4

这个问题让我感到沮丧,直到我在 SO 上找到这篇文章,我才看到太大的改进。具体来说,Bryan Bailliach 对 max varchar 的评论。我一直在使用 SQLAlchemy,即使确保更好的数据类型参数也没有为我解决问题;但是,切换到 PyODBC 确实如此。我还采纳了迈克尔·莫拉(Michael Moura)的建议,使用临时桌,发现它节省了更多的时间。我写了一个函数,以防有人觉得它有用。我写它是为了获取插入列表或列表列表。我使用 SQLAlchemy 和 Pandas 插入相同的数据,从有时 40 分钟以上缩短到不到 4 秒。不过,我可能误用了我以前的方法。fast_executemanyto_sql

连接

def mssql_conn():
    conn = pyodbc.connect(driver='{ODBC Driver 17 for SQL Server}',
                          server=os.environ.get('MS_SQL_SERVER'),
                          database='EHT',
                          uid=os.environ.get('MS_SQL_UN'),
                          pwd=os.environ.get('MS_SQL_PW'),
                          autocommit=True)
    return conn

插入功能

def mssql_insert(table,val_lst,truncate=False,temp_table=False):
    '''Use as direct connection to database to insert data, especially for
       large inserts. Takes either a single list (for one row),
       or list of list (for multiple rows). Can either append to table
       (default) or if truncate=True, replace existing.'''
    conn = mssql_conn()
    cursor = conn.cursor()
    cursor.fast_executemany = True
    tt = False
    qm = '?,'
    if isinstance(val_lst[0],list):
        rows = len(val_lst)
        params = qm * len(val_lst[0])
    else:
        rows = 1
        params = qm * len(val_lst)
        val_lst = [val_lst]
    params = params[:-1]
    if truncate:
        cursor.execute(f"TRUNCATE TABLE {table}")
    if temp_table:
        #create a temp table with same schema
        start_time = time.time()
        cursor.execute(f"SELECT * INTO ##{table} FROM {table} WHERE 1=0")
        table = f"##{table}"
        #set flag to indicate temp table was used
        tt = True
    else:
        start_time = time.time()
    #insert into either existing table or newly created temp table
    stmt = f"INSERT INTO {table} VALUES ({params})"
    cursor.executemany(stmt,val_lst)
    if tt:
        #remove temp moniker and insert from temp table
        dest_table = table[2:]
        cursor.execute(f"INSERT INTO {dest_table} SELECT * FROM {table}")
        print('Temp table used!')
        print(f'{rows} rows inserted into the {dest_table} table in {time.time() - 
              start_time} seconds')
    else:
        print('No temp table used!')
        print(f'{rows} rows inserted into the {table} table in {time.time() - 
              start_time} seconds')
    cursor.close()
    conn.close()

我的控制台结果首先使用临时表,然后不使用临时表(在这两种情况下,该表在执行时都包含数据,并且 Truncate=True):

No temp table used!
18204 rows inserted into the CUCMDeviceScrape_WithForwards table in 10.595500707626343 
seconds

Temp table used!
18204 rows inserted into the CUCMDeviceScrape_WithForwards table in 3.810380458831787 
seconds
1赞 Jon Morisi 5/3/2021 #5

FWIW,我提供了一些将我自己的测试插入 SQL Server 的方法。实际上,通过使用SQL Server Batches和pyodbcCursor.execute语句,我能够获得最快的结果。我没有测试保存到 csv 和 BULK INSERT,我想知道它如何比较。

这是我关于我所做的测试的博客: http://jonmorisissqlblog.blogspot.com/2021/05/python-pyodbc-and-batch-inserts-to-sql.html

0赞 Rahul Kumar Yadav 2/12/2023 #6

Gord Thompson的回答如下:

# add the below line for controlling batch size of insert
cursor.fast_executemany_rows = batch_size # by default it is 1000