如何使用并行加速优化

How to speed up optimization using parallel

提问人:jasmine 提问时间:11/17/2023 最后编辑:jasmine 更新时间:11/19/2023 访问量:81

问:

我正在使用 from 进行优化,假设目标函数是 。optimize.minimizescipyfun

我需要对我的数据帧的每一行进行优化,目前我正在使用 from :Paralleljoblib

import pandas as pd
import time
from joblib import Parallel,delayed
import multiprocessing
from scipy.optimize import basinhopping
import numpy as np
import jax

df=pd.DataFrame()
rng = np.random.default_rng()
df[["p11", "p12", "p13"]]=rng.dirichlet(np.ones(3),size=1120)
df[["s1", "s2", "s3"]]=rng.dirichlet(np.ones(3),size=1120)


num_cores = multiprocessing.cpu_count()-1

def get_attraction(b1,b2, b3):
    c1 = 20 * b1 + 12 * b2 + 6 * b3
    c2 = 12 * b1 + 24 * b2 + 18 * b3
    c3 = 0 * b1 + 14 * b2 + 30 * b3
    return c1,c2,c3

def get_a_tau_max(p1_tau, p2_tau, p3_tau, a):
    b1_tau_l1_a1_a1 = p1_tau * (1 - a) + a
    b2_tau_l1_a1_a1 = p2_tau * (1 - a)
    b3_tau_l1_a1_a1 = p3_tau * (1 - a)

    a1_tau_l1_a1, a2_tau_l1_a1, a3_tau_l1_a1 = get_attraction(b1_tau_l1_a1_a1, b2_tau_l1_a1_a1,b3_tau_l1_a1_a1)

    b1_tau_l1_a1_a2 = p1_tau * (1 - a)
    b2_tau_l1_a1_a2 = p2_tau * (1 - a) + a
    b3_tau_l1_a1_a2 = p3_tau * (1 - a)
    a1_tau_l1_a2, a2_tau_l1_a2, a3_tau_l1_a2 = get_attraction(b1_tau_l1_a1_a2, b2_tau_l1_a1_a2,b3_tau_l1_a1_a2)

    b1_tau_l1_a1_a3 = p1_tau * (1 - a)
    b2_tau_l1_a1_a3 = p2_tau * (1 - a)
    b3_tau_l1_a1_a3 = p3_tau * (1 - a) + a
    a1_tau_l1_a3, a2_tau_l1_a3, a3_tau_l1_a3 = get_attraction(b1_tau_l1_a1_a3, b2_tau_l1_a1_a3,b3_tau_l1_a1_a3)

    a_tau_max = max(a1_tau_l1_a1, a2_tau_l1_a2, a3_tau_l1_a3)

    return a_tau_max

def get_signal_conditional_at(at, b1, b2, b3, a):

    if at==1:
        b1_tau = b1 * (1-a) + a
        b2_tau = b2 * (1-a)
        b3_tau = b3 * (1-a)
    elif at==2:
        b1_tau = b1 * (1-a)
        b2_tau = b2 * (1-a) + a
        b3_tau = b3 * (1-a)
    else:
        b1_tau = b1 * (1-a)
        b2_tau = b2 * (1-a)
        b3_tau = b3 * (1-a) + a
    return b1_tau, b2_tau, b3_tau

def get_belief(index,row):
    if index <= 1118:
        df_mask = pd.DataFrame()
        df_mask["weight"] = [0.8 ** i for i in range(0, index+1)][::-1]
        dem = df_mask["weight"].sum()

        subdf = df.iloc[:index].copy()
        s1_history = subdf["s1"].values.tolist()
        s2_history = subdf["s2"].values.tolist()
        s3_history = subdf["s3"].values.tolist()

        belief=[]
        for at in [[row["b11"], row["b12"], row["b13"]],
                    [row["b21"], row["b22"], row["b23"]],
                    [row["b31"], row["b32"], row["b32"]]]:

            df_mask["s1"] = s1_history + [at[0]]
            df_mask["s2"] = s2_history + [at[1]]
            df_mask["s3"] = s3_history + [at[2]]

            nom1 = (df_mask["weight"] * df_mask["s1"]).sum()
            nom2 = (df_mask["weight"] * df_mask["s2"]).sum()
            nom3 = (df_mask["weight"] * df_mask["s3"]).sum()

            b1 = nom1 / dem
            b2 = nom2 / dem
            b3 = nom3 / dem
            belief += [b1,b2,b3]
        return belief
    else:
        return [0,0,0,0,0,0,0,0,0]


def get_prob(x, index,row, lamda, a): #
    p1, p2, p3 = x[0], x[1], x[2]

    p11 = row["p11"]
    p12 = row["p12"]
    p13 = row["p13"]

    b1 = p11 * (1-a) + p1 * a
    b2 = p12 * (1-a) + p2 * a
    b3 = p13 * (1-a) + p3 * a

    c1,c2,c3=get_attraction(b1,b2,b3)

    row["b11"], row["b12"], row["b13"] = get_signal_conditional_at(1, p11,p12,p13,a)
    row["b21"], row["b22"], row["b23"] = get_signal_conditional_at(2, p11,p12,p13,a)
    row["b31"], row["b32"], row["b33"] = get_signal_conditional_at(3, p11,p12,p13,a)

    belief = get_belief(index,row)

    t1 = get_a_tau_max(belief[0], belief[1], belief[2], a)
    t2 = get_a_tau_max(belief[3], belief[4], belief[5], a)
    t3 = get_a_tau_max(belief[6], belief[7], belief[8], a)

    a1 = c1+t1
    a2 = c2+t2
    a3 = c3+t3

    nom1 = np.exp(lamda*a1)
    nom2 = np.exp(lamda*a2)
    nom3 = np.exp(lamda*a3)
    dem = nom1 + nom2 + nom3

    p1_t = nom1 / dem
    p2_t = nom2 / dem
    p3_t = nom3 / dem

    return (p1_t - p1) ** 2 + (p2_t - p2) ** 2 + (p3_t - p3) ** 2

def get_root(index,row,lamda,a):
        fun = get_prob
        #jac_ = jax.jacfwd(fun)
        result = basinhopping(fun, x0=[0.0, 0.25, 0.75], niter=50, interval=10, seed=np.random.seed(0),
                              minimizer_kwargs={'args': (index,row,lamda,a), #'jac':jac_,
                                                'method': "SLSQP",
                                                'tol': 1.0e-4,
                                                'bounds': [(0.0, 1.0), (0.0, 1.0), (0.0, 1.0)],
                                                'constraints': {"type": 'eq',
                                                                'fun': lambda x: 1 - x[0] - x[1] - x[2],
                                                                'jac': lambda x: np.full_like(x,-1)}})
        x=np.append(result.x, index)
        return x

start = time.time()
p_sv = Parallel(n_jobs=num_cores)(delayed(get_root)(index=index, row=row, lamda=0.5,a=7/13)
                                  for index, row in df.iterrows())
print("elapsed:", time.time() - start)
elapsed: 240.9707179069519

每个优化都是独立的,行之间没有信息交换。

完成整个数据集的优化需要很长时间,而且它只占用了大约 30% 的 CPU(我有 M1 pro)。

我的问题是我应该设置多少个(或者有一些其他方法,例如更改后端)以使其使用 100% CPU,以便我可以更快地完成此程序?n_jobs

我可以访问一个计算机集群,该集群有 2 个 CPU,每个 CPU 有 64 个内核。我试图设置它没有提供显着的改进(我仍在学习如何利用 2 个 CPU..)。n_job=64

更新: 现在我发现提供目标函数的雅可比会减慢优化速度。有 80 行,需要 64 秒,没有它需要 20 秒(CPU 几乎 100%)。但是为什么?我以为提供雅可比会让它更快。jac_

更新: 我添加了一个 and 有 1120 行。如果我不使用 ,使用集群的速度大约是 5 倍。fundfjac_

python-3.x 优化 并行 多处理 joblib

评论

0赞 Jérôme Richard 11/17/2023
请注意,M1 处理器是一个大而小的处理器。这种架构为并行编程增加了很多复杂性。例如,进程可能被安排在错误的内核上,或者同步的成本可能要高得多。使用更多的作业通常会使事情变慢,因为需要在同一内核上调度多个进程。在内核上从一个进程切换到另一个进程的成本很高。作业数不应大于 x2 核心数。
0赞 Louis Lac 11/18/2023
@JérômeRichard 这些肯定是否得到了任何相关绩效基准或观察的支持?此外,即使这是真的(“[M1] 架构在并行编程中增加了很多复杂性”),我仍然看不出这将如何应用于 OP 问题,因为您没有在其程序中显示任何这种现象的证据。请注意,OP 也在另一台机器(可能不是 M1)上进行了尝试,并且在其代码中,内核数量已经“不大于内核数量的 2 倍”。
0赞 Jérôme Richard 11/19/2023
@LouisLac嗯,这是非常基本的知识。任何具有静态调度的统一分叉连接并行代码在 big-small CPU 上都存在问题。例如,您可以使用 OpenMP 示例进行测试。这同样适用于使用相同模型(但使用进程)的 Python。这并不特定于 M1 :最近的 Intel CPU 现在有同样的问题(例如。桤木湖)。顺便说一句,这是英特尔构建 ThreadDirector 的原因之一。我同意没有强有力的证据表明这是问题所在(因此发表此评论而不是答案)。这只是基于以前的帖子的疯狂猜测,等待更多信息
0赞 Jérôme Richard 11/19/2023
@jasmine 数据帧中有多少行?如果行数较多,作业提交速度可能比并行计算慢(因为数据需要酸洗+发送+未酸洗)。您能否提供一个最小的可重复示例分析信息
0赞 jasmine 11/19/2023
@JérômeRichard在这里,除了我更改为最小化。我有 1120 行,我有 24 个这样的数据集。到目前为止,当我使用集群(使用AMD节点)时,我使用上面的代码并提交一个作业。funget_probreturn(p1_t - p1)**2+(p2_t - p2)**2+(p3_t - p3)**2

答:

1赞 Louis Lac 11/19/2023 #1

这不是一个多处理问题,而是一个算法复杂性问题。您尝试优化的函数取决于哪个函数相对于数字具有最坏情况的线性复杂度。结果是您的程序具有二次复杂度,这可能是一个巨大的减速。get_probget_beliefindex

就其本身而言,该功能非常缓慢。问题在于,在优化过程中,优化器会多次调用(和 ),从而减慢整个执行速度。如果删除它或将其替换为常量值(例如,replace by 使昂贵的代码路径短路),您会看到您的程序明显更快。在我的计算机上,这导致了 20 倍的速度。get_beliefget_probget_beliefindex <= 118False

但是,可以注意到,它不依赖于优化函数的变量,而只依赖于行参数。这可能意味着您可以预先计算数据帧的每一行,而不是在优化的函数中计算它,从而大大提高执行速度。get_beliefxfunget_belief

评论

0赞 jasmine 11/20/2023
你拯救了我的一天!!!