提问人:kilag 提问时间:11/15/2023 最后编辑:snakecharmerbkilag 更新时间:11/16/2023 访问量:33
多处理中的多变量,具有一个公共
Multi variable in multi processing with one common
问:
我有一个 python 脚本和一个需要通过多处理执行的函数,但是,该函数的参数 (df) 之一对于每次调用都是相同的。让我用代码解释一下:
def main():
for country in all_countries:
df = SqlManager().sql_query('MyDb', "SELECT * FROM MyTable WHERE Country=''")
args = [(
df,
lvl,
timescale,
adm_code
) for lvl in ['Country', 'Region', 'County']
for timescale in ['month', 'week']
for adm_code in list(df[lvl].unique())
]
pool = Pool(cpu_count())
entry_list = list(tqdm(pool.imap(parallel_func, args), total=len(args))
哪里
def parallel_func(args):
return function_to_run(*args)
def function_to_run(df, lvl, timescale, adm_code):
"""my code"""
问题是我的 df 非常大(超过 50 数百万行),通过这个过程,我将其存储在 args 中 10000 次,而它在任何地方都完全相同 我也不想对我的并行函数进行 SQL 调用,因为我在内存中赢得的东西会因多次调用相同的数据而失去
我尝试使用全局变量,但是当我在 // 中工作时,它们没有共享相同的变量。
答:
0赞
Booboo
11/16/2023
#1
以下答案基于使 Pandas 数据帧可在本文所述的多个进程之间共享的方法。原理类是 ,按以下方式使用:SharedPandasDataFrame
# A sharable instance based on a Numpy array stored in shared memory:
shared_df = SharedPandasDataFrame(df) # sharable instance
# Reconstitute the dataframe from the sharable instance:
df = shared_df.read()
它是必须传递给进程的实例,然后该进程可以使用其方法从该实例重构数据帧。shared_df
read
from multiprocessing.shared_memory import SharedMemory
import multiprocessing as mp
import numpy as np
import pandas as pd
from tqdm import tqdm
class SharedNumpyArray:
'''
Wraps a numpy array so that it can be shared quickly among processes,
avoiding unnecessary copying and (de)serializing.
'''
def __init__(self, array):
'''
Creates the shared memory and copies the array therein
'''
# create the shared memory location of the same size of the array
self._shared = SharedMemory(create=True, size=array.nbytes)
# save data type and shape, necessary to read the data correctly
self._dtype, self._shape = array.dtype, array.shape
# create a new numpy array that uses the shared memory we created.
# at first, it is filled with zeros
res = np.ndarray(
self._shape, dtype=self._dtype, buffer=self._shared.buf
)
# copy data from the array to the shared memory. numpy will
# take care of copying everything in the correct format
res[:] = array[:]
def read(self):
'''
Reads the array from the shared memory without unnecessary copying.
'''
# simply create an array of the correct shape and type,
# using the shared memory location we created earlier
return np.ndarray(self._shape, self._dtype, buffer=self._shared.buf)
def copy(self):
'''
Returns a new copy of the array stored in shared memory.
'''
return np.copy(self.read_array())
def unlink(self):
'''
Releases the allocated memory. Call when finished using the data,
or when the data was copied somewhere else.
'''
self._shared.close()
self._shared.unlink()
class SharedPandasDataFrame:
'''
Wraps a pandas dataframe so that it can be shared quickly among processes,
avoiding unnecessary copying and (de)serializing.
'''
def __init__(self, df):
'''
Creates the shared memory and copies the dataframe therein
'''
self._values = SharedNumpyArray(df.values)
self._index = df.index
self._columns = df.columns
def read(self):
'''
Reads the dataframe from the shared memory
without unnecessary copying.
'''
return pd.DataFrame(
self._values.read(),
index=self._index,
columns=self._columns
)
def copy(self):
'''
Returns a new copy of the dataframe stored in shared memory.
'''
return pd.DataFrame(
self._values.copy(),
index=self._index,
columns=self._columns
)
def unlink(self):
'''
Releases the allocated memory. Call when finished using the data,
or when the data was copied somewhere else.
'''
self._values.unlink()
def parallel_func(tpl):
# Unpack:
shared_df, lvl, timescale, adm_code = tpl
# reconstitute the dataframe:
df = shared_df.read()
...
def main():
for country in all_countries:
df = SqlManager().sql_query('MyDb', "SELECT * FROM MyTable WHERE Country=''")
shared_df = SharedPandasDataFrame(df)
args = [(
shared_df,
lvl,
timescale,
adm_code
) for lvl in ['Country', 'Region', 'County']
for timescale in ['month', 'week']
for adm_code in list(df[lvl].unique())
]
# If you need to access the sharable dataframe, uncomment out the following
# df = shared_df.read()
with mp.Pool() as pool:
# Add missing )
entry_list = list(tqdm(pool.imap(parallel_func, args), total=len(args)))
# Destroy shared memory rendering shared_df unusable:
shared_df.unlink()
if __name__ == '__main__':
main()
评论