任务包含随机数的多处理中的可复制性

Reproductibility in multiprocessing where task contain random

提问人:Progstud 提问时间:11/17/2023 更新时间:11/17/2023 访问量:34

问:

我遇到了一些问题,我们不得不在parralele中放置一个收敛算法以使其更快,从业务角度来看,该算法的结果令人满意。但是出于审计的原因(并且因为业务要求......),我们需要使其可复制

上下文:

  • 任务包含随机生成器用法
  • 任务或多或少需要时间,因为任务中有很多条件(所以每次运行都会分派给不同的工作线程),在这里我放了睡眠来模仿这个功能
  • 实际情况为:NB_WORKER=10,NB_OBJECT=1030,NB_ITER=100
  • Python 3.7 - Windows

我虽然给了每个object_随机生成器的责任......但不知道这是否是一个好主意,以及我是否会失去计算时间

以下是一段简化的代码:

from multiprocessing import Pool,get_context
from time import sleep
from numpy import random as nprng
import random
from datetime import datetime

class Object(): 
    def __init__(self,id_): 
        self.id_=id_
        self.value=0


def task(object_):
    sleep(random.random()) # task taking random time to execute (because lot of conditionnal)
    object_.value=rng.uniform()
    return object_

def init_worker(client_id,generators): 
        global rng
        global worker_id
        with client_id.get_lock():
            globals()['client_id'] = client_id.value
            worker_id=globals()['client_id']
            rng=nprng.Generator(generators[globals()['client_id']])
            client_id.value += 1


if __name__ == "__main__":
    # Init Pool and workers with random generators
    NB_PROCESS=5
    NB_OBJECT=34
    NB_ITER=3

    ctx = get_context("spawn")
    client_ids = ctx.Value('i', 0)
    sequences = [nprng.SeedSequence((1209391983918, worker_id)) for worker_id in range(NB_PROCESS)]
    generators = [nprng.PCG64(seq) for seq in sequences]    
    p=Pool(processes=NB_PROCESS,initializer=init_worker, initargs=(client_ids,generators,))
    
    # Parralle task

    objects=[Object(i) for i in range(NB_OBJECT)]
    arg=[(object_,) for object_ in objects]

    start=datetime.now()
    total_sum=0
    for i in range(NB_ITER):
        res=p.starmap(task,arg)
        
        total_sum+=sum([o.value for o in res])

    print(f'Total sum is : {total_sum}')
    end=datetime.now()
    print(end-start)

所以我试着把随机生成器放到对象中:

from multiprocessing import Pool,get_context
from time import sleep
from numpy import random as nprng
import random
from datetime import datetime
class Object(): 
    def __init__(self,id_,generator): 
        self.id_=id_
        self.value=0
        self.rng=nprng.Generator(generator)


def task(object_):
    sleep(random.random()) #task taking random time to execute (because lot of conditionnal)
    object_.value=object_.rng.uniform()
    return object_

def init_worker(client_id): 
        # global rng
        global worker_id
        with client_id.get_lock():
            globals()['client_id'] = client_id.value
            client_id.value += 1


if __name__ == "__main__":
    # Init Pool and workers with random generators
    NB_PROCESS=5
    NB_OBJECT=1030
    NB_ITER=3

    ctx = get_context("spawn")
    client_ids = ctx.Value('i', 0)
  
    p=Pool(processes=NB_PROCESS,initializer=init_worker, initargs=(client_ids,))
    
    # Parralle task
    sequences = [nprng.SeedSequence((1209391983918, worker_id)) for worker_id in range(NB_OBJECT)]
    generators = [nprng.PCG64(seq) for seq in sequences]  
    objects=[Object(i,generators[i]) for i in range(NB_OBJECT)]
    arg=[(object_,) for object_ in objects]

    start=datetime.now()
    total_sum=0
    for i in range(NB_ITER):
        res=p.starmap(task,arg)
        total_sum+=sum([o.value for o in res])

    print(f'Total sum is : {total_sum}')
    end=datetime.now()
    print(end-start)

但它似乎更耗时,也许是因为对象访问? 当并行任务在其中使用随机时如何达到可复制性,还有其他想法吗?

python 随机 并行 多处理

评论


答: 暂无答案