Python 脚本内存不足

Python script running out of memory

提问人:Ucha Samadashvili 提问时间:11/14/2023 更新时间:11/14/2023 访问量:41

问:

以下脚本内存不足。我可以看到子进程保持相同的消耗,但父进程似乎不断增加内存使用量。我试图理解为什么会发生这种情况。

import concurrent.futures
import os
import pandas
from sqlalchemy import create_engine, text
from datetime import datetime

db_host = os.environ.get('DB_HOST')
db_port = os.environ.get('DB_PORT')
db_username = os.environ.get('DB_USERNAME')
db_password = os.environ.get('DB_PASSWORD')
db_database = os.environ.get('DB_DATABASE')
engine_str = (create_engine(f"postgresql+psycopg2://{db_username}:{db_password}@{db_host}:{db_port}/{db_database}")
              .execution_options(stream_results=True))
engine = create_engine(f"postgresql+psycopg2://{db_username}:{db_password}@{db_host}:{db_port}/{db_database}")


def process_chunk(chunk):
    with engine.connect() as con:
        for i in range(len(chunk)):
            row = dict(chunk.loc[i])
            new_row = {}
            for key in row:
                if key == "serial_no":
                    new_row["serial_number"] = row[key]
                elif key == "product_id":
                    new_row["product_id"] = row[key]
                else:
                    new_row["contract_" + key] = row[key]
            new_row["contract_inserted_at"] = datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S")

            stmt = text("INSERT INTO table_name (col1, col2) values (1, 2)")

            con.execute(stmt, parameters=new_row)
            con.commit()


if __name__ == '__main__':
    with engine_str.connect() as con:
        serials = pandas.read_sql("""SELECT * FROM lamp_contract_dn
            WHERE contract_hdr_status in ('Active', 'Expired') AND 
            serial_no IS NOT NULL AND product_id IS NOT NULL""", con, chunksize=100)

        with concurrent.futures.ProcessPoolExecutor() as executor:
            executor.map(process_chunk, serials)

注:2 不同的引擎配置。这是必要的,因为engine_str使用远程光标对结果进行流式处理。如果我尝试重复使用相同的引擎进行写入,我似乎收到“declARE”c_11396e790_1“CURSOR WITHOUT HOLD FOR”错误。

我认为engine_str不会转移到子进程,因为子进程使用完全不同的引擎。我尝试将引擎作为 ProcessPoolExecutor() 的初始值设定项进行处理,但我遇到了同样的问题。请参阅下面的代码。

def initializer():
    engine.dispose(False)

with concurrent.futures.ProcessPoolExecutor(initializer=initializer) as executor:
            executor.map(process_chunk, serials)

我的期望是主进程只会使用下一个(序列)所需的内存

python-3.x pandas sqlalchemy 并发

评论

0赞 vahvero 11/15/2023
在RAM中保存的数据量是否太大,无法首先将所有输入生成到列表中,然后使用PostGres批量创建将所有行插入数据库?否则,您也可以在迭代器的某些部分进行批量创建。
0赞 Ucha Samadashvili 11/15/2023
@vahvero是的,我尝试流式传输的数据太大了。请在我的迭代器的某些部分详细说明批量创建的含义。这将如何减少内存消耗?
0赞 vahvero 11/15/2023
我的意思是,您可以执行一组大小,批量创建,然后继续下一步,而不是尝试这种并发方法。在循环中调用 commit 应该非常昂贵。此方法应具有恒定的内存大小。N

答: 暂无答案