提问人:Progstud 提问时间:11/17/2023 更新时间:11/17/2023 访问量:34
任务包含随机数的多处理中的可复制性
Reproductibility in multiprocessing where task contain random
问:
我遇到了一些问题,我们不得不在parralele中放置一个收敛算法以使其更快,从业务角度来看,该算法的结果令人满意。但是出于审计的原因(并且因为业务要求......),我们需要使其可复制。
上下文:
- 任务包含随机生成器用法
- 任务或多或少需要时间,因为任务中有很多条件(所以每次运行都会分派给不同的工作线程),在这里我放了睡眠来模仿这个功能
- 实际情况为:NB_WORKER=10,NB_OBJECT=1030,NB_ITER=100
- Python 3.7 - Windows
我虽然给了每个object_随机生成器的责任......但不知道这是否是一个好主意,以及我是否会失去计算时间。
以下是一段简化的代码:
from multiprocessing import Pool,get_context
from time import sleep
from numpy import random as nprng
import random
from datetime import datetime
class Object():
def __init__(self,id_):
self.id_=id_
self.value=0
def task(object_):
sleep(random.random()) # task taking random time to execute (because lot of conditionnal)
object_.value=rng.uniform()
return object_
def init_worker(client_id,generators):
global rng
global worker_id
with client_id.get_lock():
globals()['client_id'] = client_id.value
worker_id=globals()['client_id']
rng=nprng.Generator(generators[globals()['client_id']])
client_id.value += 1
if __name__ == "__main__":
# Init Pool and workers with random generators
NB_PROCESS=5
NB_OBJECT=34
NB_ITER=3
ctx = get_context("spawn")
client_ids = ctx.Value('i', 0)
sequences = [nprng.SeedSequence((1209391983918, worker_id)) for worker_id in range(NB_PROCESS)]
generators = [nprng.PCG64(seq) for seq in sequences]
p=Pool(processes=NB_PROCESS,initializer=init_worker, initargs=(client_ids,generators,))
# Parralle task
objects=[Object(i) for i in range(NB_OBJECT)]
arg=[(object_,) for object_ in objects]
start=datetime.now()
total_sum=0
for i in range(NB_ITER):
res=p.starmap(task,arg)
total_sum+=sum([o.value for o in res])
print(f'Total sum is : {total_sum}')
end=datetime.now()
print(end-start)
所以我试着把随机生成器放到对象中:
from multiprocessing import Pool,get_context
from time import sleep
from numpy import random as nprng
import random
from datetime import datetime
class Object():
def __init__(self,id_,generator):
self.id_=id_
self.value=0
self.rng=nprng.Generator(generator)
def task(object_):
sleep(random.random()) #task taking random time to execute (because lot of conditionnal)
object_.value=object_.rng.uniform()
return object_
def init_worker(client_id):
# global rng
global worker_id
with client_id.get_lock():
globals()['client_id'] = client_id.value
client_id.value += 1
if __name__ == "__main__":
# Init Pool and workers with random generators
NB_PROCESS=5
NB_OBJECT=1030
NB_ITER=3
ctx = get_context("spawn")
client_ids = ctx.Value('i', 0)
p=Pool(processes=NB_PROCESS,initializer=init_worker, initargs=(client_ids,))
# Parralle task
sequences = [nprng.SeedSequence((1209391983918, worker_id)) for worker_id in range(NB_OBJECT)]
generators = [nprng.PCG64(seq) for seq in sequences]
objects=[Object(i,generators[i]) for i in range(NB_OBJECT)]
arg=[(object_,) for object_ in objects]
start=datetime.now()
total_sum=0
for i in range(NB_ITER):
res=p.starmap(task,arg)
total_sum+=sum([o.value for o in res])
print(f'Total sum is : {total_sum}')
end=datetime.now()
print(end-start)
但它似乎更耗时,也许是因为对象访问? 当并行任务在其中使用随机时如何达到可复制性,还有其他想法吗?
答: 暂无答案
评论