提问人:jasmine 提问时间:11/17/2023 最后编辑:jasmine 更新时间:11/19/2023 访问量:81
如何使用并行加速优化
How to speed up optimization using parallel
问:
我正在使用 from 进行优化,假设目标函数是 。optimize.minimize
scipy
fun
我需要对我的数据帧的每一行进行优化,目前我正在使用 from :Parallel
joblib
import pandas as pd
import time
from joblib import Parallel,delayed
import multiprocessing
from scipy.optimize import basinhopping
import numpy as np
import jax
df=pd.DataFrame()
rng = np.random.default_rng()
df[["p11", "p12", "p13"]]=rng.dirichlet(np.ones(3),size=1120)
df[["s1", "s2", "s3"]]=rng.dirichlet(np.ones(3),size=1120)
num_cores = multiprocessing.cpu_count()-1
def get_attraction(b1,b2, b3):
c1 = 20 * b1 + 12 * b2 + 6 * b3
c2 = 12 * b1 + 24 * b2 + 18 * b3
c3 = 0 * b1 + 14 * b2 + 30 * b3
return c1,c2,c3
def get_a_tau_max(p1_tau, p2_tau, p3_tau, a):
b1_tau_l1_a1_a1 = p1_tau * (1 - a) + a
b2_tau_l1_a1_a1 = p2_tau * (1 - a)
b3_tau_l1_a1_a1 = p3_tau * (1 - a)
a1_tau_l1_a1, a2_tau_l1_a1, a3_tau_l1_a1 = get_attraction(b1_tau_l1_a1_a1, b2_tau_l1_a1_a1,b3_tau_l1_a1_a1)
b1_tau_l1_a1_a2 = p1_tau * (1 - a)
b2_tau_l1_a1_a2 = p2_tau * (1 - a) + a
b3_tau_l1_a1_a2 = p3_tau * (1 - a)
a1_tau_l1_a2, a2_tau_l1_a2, a3_tau_l1_a2 = get_attraction(b1_tau_l1_a1_a2, b2_tau_l1_a1_a2,b3_tau_l1_a1_a2)
b1_tau_l1_a1_a3 = p1_tau * (1 - a)
b2_tau_l1_a1_a3 = p2_tau * (1 - a)
b3_tau_l1_a1_a3 = p3_tau * (1 - a) + a
a1_tau_l1_a3, a2_tau_l1_a3, a3_tau_l1_a3 = get_attraction(b1_tau_l1_a1_a3, b2_tau_l1_a1_a3,b3_tau_l1_a1_a3)
a_tau_max = max(a1_tau_l1_a1, a2_tau_l1_a2, a3_tau_l1_a3)
return a_tau_max
def get_signal_conditional_at(at, b1, b2, b3, a):
if at==1:
b1_tau = b1 * (1-a) + a
b2_tau = b2 * (1-a)
b3_tau = b3 * (1-a)
elif at==2:
b1_tau = b1 * (1-a)
b2_tau = b2 * (1-a) + a
b3_tau = b3 * (1-a)
else:
b1_tau = b1 * (1-a)
b2_tau = b2 * (1-a)
b3_tau = b3 * (1-a) + a
return b1_tau, b2_tau, b3_tau
def get_belief(index,row):
if index <= 1118:
df_mask = pd.DataFrame()
df_mask["weight"] = [0.8 ** i for i in range(0, index+1)][::-1]
dem = df_mask["weight"].sum()
subdf = df.iloc[:index].copy()
s1_history = subdf["s1"].values.tolist()
s2_history = subdf["s2"].values.tolist()
s3_history = subdf["s3"].values.tolist()
belief=[]
for at in [[row["b11"], row["b12"], row["b13"]],
[row["b21"], row["b22"], row["b23"]],
[row["b31"], row["b32"], row["b32"]]]:
df_mask["s1"] = s1_history + [at[0]]
df_mask["s2"] = s2_history + [at[1]]
df_mask["s3"] = s3_history + [at[2]]
nom1 = (df_mask["weight"] * df_mask["s1"]).sum()
nom2 = (df_mask["weight"] * df_mask["s2"]).sum()
nom3 = (df_mask["weight"] * df_mask["s3"]).sum()
b1 = nom1 / dem
b2 = nom2 / dem
b3 = nom3 / dem
belief += [b1,b2,b3]
return belief
else:
return [0,0,0,0,0,0,0,0,0]
def get_prob(x, index,row, lamda, a): #
p1, p2, p3 = x[0], x[1], x[2]
p11 = row["p11"]
p12 = row["p12"]
p13 = row["p13"]
b1 = p11 * (1-a) + p1 * a
b2 = p12 * (1-a) + p2 * a
b3 = p13 * (1-a) + p3 * a
c1,c2,c3=get_attraction(b1,b2,b3)
row["b11"], row["b12"], row["b13"] = get_signal_conditional_at(1, p11,p12,p13,a)
row["b21"], row["b22"], row["b23"] = get_signal_conditional_at(2, p11,p12,p13,a)
row["b31"], row["b32"], row["b33"] = get_signal_conditional_at(3, p11,p12,p13,a)
belief = get_belief(index,row)
t1 = get_a_tau_max(belief[0], belief[1], belief[2], a)
t2 = get_a_tau_max(belief[3], belief[4], belief[5], a)
t3 = get_a_tau_max(belief[6], belief[7], belief[8], a)
a1 = c1+t1
a2 = c2+t2
a3 = c3+t3
nom1 = np.exp(lamda*a1)
nom2 = np.exp(lamda*a2)
nom3 = np.exp(lamda*a3)
dem = nom1 + nom2 + nom3
p1_t = nom1 / dem
p2_t = nom2 / dem
p3_t = nom3 / dem
return (p1_t - p1) ** 2 + (p2_t - p2) ** 2 + (p3_t - p3) ** 2
def get_root(index,row,lamda,a):
fun = get_prob
#jac_ = jax.jacfwd(fun)
result = basinhopping(fun, x0=[0.0, 0.25, 0.75], niter=50, interval=10, seed=np.random.seed(0),
minimizer_kwargs={'args': (index,row,lamda,a), #'jac':jac_,
'method': "SLSQP",
'tol': 1.0e-4,
'bounds': [(0.0, 1.0), (0.0, 1.0), (0.0, 1.0)],
'constraints': {"type": 'eq',
'fun': lambda x: 1 - x[0] - x[1] - x[2],
'jac': lambda x: np.full_like(x,-1)}})
x=np.append(result.x, index)
return x
start = time.time()
p_sv = Parallel(n_jobs=num_cores)(delayed(get_root)(index=index, row=row, lamda=0.5,a=7/13)
for index, row in df.iterrows())
print("elapsed:", time.time() - start)
elapsed: 240.9707179069519
每个优化都是独立的,行之间没有信息交换。
完成整个数据集的优化需要很长时间,而且它只占用了大约 30% 的 CPU(我有 M1 pro)。
我的问题是我应该设置多少个(或者有一些其他方法,例如更改后端)以使其使用 100% CPU,以便我可以更快地完成此程序?n_jobs
我可以访问一个计算机集群,该集群有 2 个 CPU,每个 CPU 有 64 个内核。我试图设置它没有提供显着的改进(我仍在学习如何利用 2 个 CPU..)。n_job=64
更新:
现在我发现提供目标函数的雅可比会减慢优化速度。有 80 行,需要 64 秒,没有它需要 20 秒(CPU 几乎 100%)。但是为什么?我以为提供雅可比会让它更快。jac_
更新:
我添加了一个 and 有 1120 行。如果我不使用 ,使用集群的速度大约是 5 倍。fun
df
jac_
答:
这不是一个多处理问题,而是一个算法复杂性问题。您尝试优化的函数取决于哪个函数相对于数字具有最坏情况的线性复杂度。结果是您的程序具有二次复杂度,这可能是一个巨大的减速。get_prob
get_belief
index
就其本身而言,该功能非常缓慢。问题在于,在优化过程中,优化器会多次调用(和 ),从而减慢整个执行速度。如果删除它或将其替换为常量值(例如,replace by 使昂贵的代码路径短路),您会看到您的程序明显更快。在我的计算机上,这导致了 20 倍的速度。get_belief
get_prob
get_belief
index <= 118
False
但是,可以注意到,它不依赖于优化函数的变量,而只依赖于行参数。这可能意味着您可以预先计算数据帧的每一行,而不是在优化的函数中计算它,从而大大提高执行速度。get_belief
x
fun
get_belief
评论
fun
get_prob
return
(p1_t - p1)**2+(p2_t - p2)**2+(p3_t - p3)**2