多处理中的多变量,具有一个公共

Multi variable in multi processing with one common

提问人:kilag 提问时间:11/15/2023 最后编辑:snakecharmerbkilag 更新时间:11/16/2023 访问量:33

问:

我有一个 python 脚本和一个需要通过多处理执行的函数,但是,该函数的参数 (df) 之一对于每次调用都是相同的。让我用代码解释一下:

def main(): 
    for country in all_countries:
        df = SqlManager().sql_query('MyDb', "SELECT * FROM MyTable WHERE Country=''")
        args = [(
            df,
            lvl,
            timescale,
            adm_code
        ) for lvl in ['Country', 'Region', 'County']
            for timescale in ['month', 'week']
            for adm_code in list(df[lvl].unique())
        ]
        pool = Pool(cpu_count())
        entry_list = list(tqdm(pool.imap(parallel_func, args), total=len(args))
     

哪里

def parallel_func(args):
    return function_to_run(*args)

def function_to_run(df, lvl, timescale, adm_code):
    """my code"""

问题是我的 df 非常大(超过 50 数百万行),通过这个过程,我将其存储在 args 中 10000 次,而它在任何地方都完全相同 我也不想对我的并行函数进行 SQL 调用,因为我在内存中赢得的东西会因多次调用相同的数据而失去

我尝试使用全局变量,但是当我在 // 中工作时,它们没有共享相同的变量。

Python SQL 多处理 参数

评论

0赞 CtrlZ 11/15/2023
表是否包含 50m 行,还是查询返回 50m 行?
0赞 kilag 11/15/2023
我猜这两个查询都返回 50mrows,它们被存储到 df 中
0赞 CtrlZ 11/15/2023
你说“猜”?
0赞 kilag 11/15/2023
我的意思是两者兼而有之,数据帧为 50m 行,因为查询返回 50m 行
0赞 CtrlZ 11/15/2023
为什么你有这个循环:对于all_countries的国家

答:

0赞 Booboo 11/16/2023 #1

以下答案基于使 Pandas 数据帧可在本文所述的多个进程之间共享的方法。原理类是 ,按以下方式使用:SharedPandasDataFrame

# A sharable instance based on a Numpy array stored in shared memory:
shared_df = SharedPandasDataFrame(df) # sharable instance
# Reconstitute the dataframe from the sharable instance: 
df = shared_df.read()

它是必须传递给进程的实例,然后该进程可以使用其方法从该实例重构数据帧。shared_dfread

from multiprocessing.shared_memory import SharedMemory
import multiprocessing as mp

import numpy as np
import pandas as pd
from tqdm import tqdm

class SharedNumpyArray:
    '''
    Wraps a numpy array so that it can be shared quickly among processes,
    avoiding unnecessary copying and (de)serializing.
    '''
    def __init__(self, array):
        '''
        Creates the shared memory and copies the array therein
        '''
        # create the shared memory location of the same size of the array
        self._shared = SharedMemory(create=True, size=array.nbytes)

        # save data type and shape, necessary to read the data correctly
        self._dtype, self._shape = array.dtype, array.shape

        # create a new numpy array that uses the shared memory we created.
        # at first, it is filled with zeros
        res = np.ndarray(
            self._shape, dtype=self._dtype, buffer=self._shared.buf
        )

        # copy data from the array to the shared memory. numpy will
        # take care of copying everything in the correct format
        res[:] = array[:]

    def read(self):
        '''
        Reads the array from the shared memory without unnecessary copying.
        '''
        # simply create an array of the correct shape and type,
        # using the shared memory location we created earlier
        return np.ndarray(self._shape, self._dtype, buffer=self._shared.buf)

    def copy(self):
        '''
        Returns a new copy of the array stored in shared memory.
        '''
        return np.copy(self.read_array())

    def unlink(self):
        '''
        Releases the allocated memory. Call when finished using the data,
        or when the data was copied somewhere else.
        '''
        self._shared.close()
        self._shared.unlink()

class SharedPandasDataFrame:
    '''
    Wraps a pandas dataframe so that it can be shared quickly among processes,
    avoiding unnecessary copying and (de)serializing.
    '''
    def __init__(self, df):
        '''
        Creates the shared memory and copies the dataframe therein
        '''
        self._values = SharedNumpyArray(df.values)
        self._index = df.index
        self._columns = df.columns

    def read(self):
        '''
        Reads the dataframe from the shared memory
        without unnecessary copying.
        '''
        return pd.DataFrame(
            self._values.read(),
            index=self._index,
            columns=self._columns
        )

    def copy(self):
        '''
        Returns a new copy of the dataframe stored in shared memory.
        '''
        return pd.DataFrame(
            self._values.copy(),
            index=self._index,
            columns=self._columns
        )

    def unlink(self):
        '''
        Releases the allocated memory. Call when finished using the data,
        or when the data was copied somewhere else.
        '''
        self._values.unlink()


def parallel_func(tpl):
    # Unpack:
    shared_df, lvl, timescale, adm_code = tpl
    # reconstitute the dataframe:
    df = shared_df.read()
    ...


def main():
    for country in all_countries:
        df = SqlManager().sql_query('MyDb', "SELECT * FROM MyTable WHERE Country=''")
        shared_df = SharedPandasDataFrame(df)
        args = [(
            shared_df,
            lvl,
            timescale,
            adm_code
        ) for lvl in ['Country', 'Region', 'County']
            for timescale in ['month', 'week']
            for adm_code in list(df[lvl].unique())
        ]
        # If you need to access the sharable dataframe, uncomment out the following
        # df = shared_df.read()

        with mp.Pool() as pool:
            # Add missing )
            entry_list = list(tqdm(pool.imap(parallel_func, args), total=len(args)))
        # Destroy shared memory rendering shared_df unusable:
        shared_df.unlink()

if __name__ == '__main__':
    main()