提问人:TangoAlee 提问时间:4/15/2015 最后编辑:Gord ThompsonTangoAlee 更新时间:10/22/2023 访问量:106531
如何使用 pyodbc 加快批量插入到 MS SQL Server 的速度
How to speed up bulk insert to MS SQL Server using pyodbc
问:
下面是我的代码,我需要一些帮助。 我必须在 1,300,000 行上运行它,这意味着插入 ~300,000 行最多需要 40 分钟。
我认为批量插入是加快速度的途径吗?
还是因为我正在通过 portion 遍历行?for data in reader:
#Opens the prepped csv file
with open (os.path.join(newpath,outfile), 'r') as f:
#hooks csv reader to file
reader = csv.reader(f)
#pulls out the columns (which match the SQL table)
columns = next(reader)
#trims any extra spaces
columns = [x.strip(' ') for x in columns]
#starts SQL statement
query = 'bulk insert into SpikeData123({0}) values ({1})'
#puts column names in SQL query 'query'
query = query.format(','.join(columns), ','.join('?' * len(columns)))
print 'Query is: %s' % query
#starts curser from cnxn (which works)
cursor = cnxn.cursor()
#uploads everything by row
for data in reader:
cursor.execute(query, data)
cursor.commit()
我故意动态选择我的列标题(因为我想创建尽可能多的 pythonic 代码)。
SpikeData123 是表名。
答:
是的,批量插入是将大文件加载到数据库中的正确路径。乍一看,我会说它需要这么长时间的原因是,正如您提到的,您正在遍历文件中的每一行数据,这实际上意味着消除了使用批量插入的好处,并使其像普通插入一样。请记住,顾名思义,它用于插入大量数据。 我会删除循环并重试。
此外,我会仔细检查您的批量插入语法,因为它对我来说看起来不正确。检查 pyodbc 生成的 sql,因为我感觉它可能只是在执行正常的插入
或者,如果它仍然很慢,我会尝试直接从 sql 使用批量插入,然后使用批量插入将整个文件加载到临时表中,然后将相关列插入到正确的表中。或者混合使用大容量插入和 bcp 来插入特定列或 OPENROWSET。
更新 - 2022 年 5 月:bcpandas 和 bcpyaz 是 Microsoft 实用程序的包装器。bcp
更新 - 2019 年 4 月:如@SimonLang的评论中所述,在 SQL Server 2017 及更高版本下,显然支持 CSV 文件中的文本限定符(参考:此处)。BULK INSERT
几乎可以肯定,BULK INSERT 比逐行读取源文件并对每一行执行常规 INSERT 要快得多。但是,BULK INSERT 和 BCP 在 CSV 文件方面都有很大的限制,因为它们无法处理文本限定符(参考:此处)。也就是说,如果您的 CSV 文件中没有限定的文本字符串......
1,Gord Thompson,2015-04-15
2,Bob Loblaw,2015-04-07
...那么你可以批量插入它,但如果它包含文本限定符(因为某些文本值包含逗号)......
1,"Thompson, Gord",2015-04-15
2,"Loblaw, Bob",2015-04-07
...则 BULK INSERT 无法处理它。不过,将这样的 CSV 文件预处理为管道分隔的文件总体上可能会更快......
1|Thompson, Gord|2015-04-15
2|Loblaw, Bob|2015-04-07
...或制表符分隔的文件(其中表示制表符)...→
1→Thompson, Gord→2015-04-15
2→Loblaw, Bob→2015-04-07
...然后批量插入该文件。对于后一个(制表符分隔)文件,BULK INSERT 代码如下所示:
import pypyodbc
conn_str = "DSN=myDb_SQLEXPRESS;"
cnxn = pypyodbc.connect(conn_str)
crsr = cnxn.cursor()
sql = """
BULK INSERT myDb.dbo.SpikeData123
FROM 'C:\\__tmp\\biTest.txt' WITH (
FIELDTERMINATOR='\\t',
ROWTERMINATOR='\\n'
);
"""
crsr.execute(sql)
cnxn.commit()
crsr.close()
cnxn.close()
注意:如注释中所述,仅当 SQL Server 实例可以直接读取源文件时,执行语句才适用。有关源文件位于远程客户端上的情况,请参阅此答案。BULK INSERT
评论
BULK INSERT
如对另一个答案的注释中所述,仅当要导入的文件与 SQL Server 实例位于同一台计算机上或位于 SQL Server 实例可以读取的 SMB/CIFS 网络位置时,T-SQL 命令才有效。因此,如果源文件位于远程客户端上,则它可能不适用。BULK INSERT
pyodbc 4.0.19 添加了 Cursor#fast_executemany 功能,在这种情况下可能会有所帮助。 默认为“关闭”,以下测试代码...fast_executemany
cnxn = pyodbc.connect(conn_str, autocommit=True)
crsr = cnxn.cursor()
crsr.execute("TRUNCATE TABLE fast_executemany_test")
sql = "INSERT INTO fast_executemany_test (txtcol) VALUES (?)"
params = [(f'txt{i:06d}',) for i in range(1000)]
t0 = time.perf_counter()
crsr.executemany(sql, params)
print(f'{time.perf_counter() - t0:.1f} seconds')
...在我的测试机器上执行大约需要 22 秒。只需添加...crsr.fast_executemany = True
cnxn = pyodbc.connect(conn_str, autocommit=True)
crsr = cnxn.cursor()
crsr.execute("TRUNCATE TABLE fast_executemany_test")
crsr.fast_executemany = True # new in pyodbc 4.0.19
sql = "INSERT INTO fast_executemany_test (txtcol) VALUES (?)"
params = [(f'txt{i:06d}',) for i in range(1000)]
t0 = time.perf_counter()
crsr.executemany(sql, params)
print(f'{time.perf_counter() - t0:.1f} seconds')
...将执行时间缩短到刚刚超过 1 秒。
评论
DataFrame
df.values.tolist()
VALUES
.txt.
.csv
numpy
.executemany
fast_executemany
这个问题让我感到沮丧,直到我在 SO 上找到这篇文章,我才看到太大的改进。具体来说,Bryan Bailliach 对 max varchar 的评论。我一直在使用 SQLAlchemy,即使确保更好的数据类型参数也没有为我解决问题;但是,切换到 PyODBC 确实如此。我还采纳了迈克尔·莫拉(Michael Moura)的建议,使用临时桌,发现它节省了更多的时间。我写了一个函数,以防有人觉得它有用。我写它是为了获取插入列表或列表列表。我使用 SQLAlchemy 和 Pandas 插入相同的数据,从有时 40 分钟以上缩短到不到 4 秒。不过,我可能误用了我以前的方法。fast_executemany
to_sql
连接
def mssql_conn():
conn = pyodbc.connect(driver='{ODBC Driver 17 for SQL Server}',
server=os.environ.get('MS_SQL_SERVER'),
database='EHT',
uid=os.environ.get('MS_SQL_UN'),
pwd=os.environ.get('MS_SQL_PW'),
autocommit=True)
return conn
插入功能
def mssql_insert(table,val_lst,truncate=False,temp_table=False):
'''Use as direct connection to database to insert data, especially for
large inserts. Takes either a single list (for one row),
or list of list (for multiple rows). Can either append to table
(default) or if truncate=True, replace existing.'''
conn = mssql_conn()
cursor = conn.cursor()
cursor.fast_executemany = True
tt = False
qm = '?,'
if isinstance(val_lst[0],list):
rows = len(val_lst)
params = qm * len(val_lst[0])
else:
rows = 1
params = qm * len(val_lst)
val_lst = [val_lst]
params = params[:-1]
if truncate:
cursor.execute(f"TRUNCATE TABLE {table}")
if temp_table:
#create a temp table with same schema
start_time = time.time()
cursor.execute(f"SELECT * INTO ##{table} FROM {table} WHERE 1=0")
table = f"##{table}"
#set flag to indicate temp table was used
tt = True
else:
start_time = time.time()
#insert into either existing table or newly created temp table
stmt = f"INSERT INTO {table} VALUES ({params})"
cursor.executemany(stmt,val_lst)
if tt:
#remove temp moniker and insert from temp table
dest_table = table[2:]
cursor.execute(f"INSERT INTO {dest_table} SELECT * FROM {table}")
print('Temp table used!')
print(f'{rows} rows inserted into the {dest_table} table in {time.time() -
start_time} seconds')
else:
print('No temp table used!')
print(f'{rows} rows inserted into the {table} table in {time.time() -
start_time} seconds')
cursor.close()
conn.close()
我的控制台结果首先使用临时表,然后不使用临时表(在这两种情况下,该表在执行时都包含数据,并且 Truncate=True):
No temp table used!
18204 rows inserted into the CUCMDeviceScrape_WithForwards table in 10.595500707626343
seconds
Temp table used!
18204 rows inserted into the CUCMDeviceScrape_WithForwards table in 3.810380458831787
seconds
FWIW,我提供了一些将我自己的测试插入 SQL Server 的方法。实际上,通过使用SQL Server Batches和pyodbcCursor.execute语句,我能够获得最快的结果。我没有测试保存到 csv 和 BULK INSERT,我想知道它如何比较。
这是我关于我所做的测试的博客: http://jonmorisissqlblog.blogspot.com/2021/05/python-pyodbc-and-batch-inserts-to-sql.html
Gord Thompson的回答如下:
# add the below line for controlling batch size of insert
cursor.fast_executemany_rows = batch_size # by default it is 1000
评论