提问人:Ucha Samadashvili 提问时间:11/14/2023 更新时间:11/14/2023 访问量:41
Python 脚本内存不足
Python script running out of memory
问:
以下脚本内存不足。我可以看到子进程保持相同的消耗,但父进程似乎不断增加内存使用量。我试图理解为什么会发生这种情况。
import concurrent.futures
import os
import pandas
from sqlalchemy import create_engine, text
from datetime import datetime
db_host = os.environ.get('DB_HOST')
db_port = os.environ.get('DB_PORT')
db_username = os.environ.get('DB_USERNAME')
db_password = os.environ.get('DB_PASSWORD')
db_database = os.environ.get('DB_DATABASE')
engine_str = (create_engine(f"postgresql+psycopg2://{db_username}:{db_password}@{db_host}:{db_port}/{db_database}")
.execution_options(stream_results=True))
engine = create_engine(f"postgresql+psycopg2://{db_username}:{db_password}@{db_host}:{db_port}/{db_database}")
def process_chunk(chunk):
with engine.connect() as con:
for i in range(len(chunk)):
row = dict(chunk.loc[i])
new_row = {}
for key in row:
if key == "serial_no":
new_row["serial_number"] = row[key]
elif key == "product_id":
new_row["product_id"] = row[key]
else:
new_row["contract_" + key] = row[key]
new_row["contract_inserted_at"] = datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S")
stmt = text("INSERT INTO table_name (col1, col2) values (1, 2)")
con.execute(stmt, parameters=new_row)
con.commit()
if __name__ == '__main__':
with engine_str.connect() as con:
serials = pandas.read_sql("""SELECT * FROM lamp_contract_dn
WHERE contract_hdr_status in ('Active', 'Expired') AND
serial_no IS NOT NULL AND product_id IS NOT NULL""", con, chunksize=100)
with concurrent.futures.ProcessPoolExecutor() as executor:
executor.map(process_chunk, serials)
注:2 不同的引擎配置。这是必要的,因为engine_str使用远程光标对结果进行流式处理。如果我尝试重复使用相同的引擎进行写入,我似乎收到“declARE”c_11396e790_1“CURSOR WITHOUT HOLD FOR”错误。
我认为engine_str不会转移到子进程,因为子进程使用完全不同的引擎。我尝试将引擎作为 ProcessPoolExecutor() 的初始值设定项进行处理,但我遇到了同样的问题。请参阅下面的代码。
def initializer():
engine.dispose(False)
with concurrent.futures.ProcessPoolExecutor(initializer=initializer) as executor:
executor.map(process_chunk, serials)
我的期望是主进程只会使用下一个(序列)所需的内存
答: 暂无答案
评论
N