提高 Velocity-Verlet 积分算法的性能

Improve performance of Velocity-Verlet integration algorithm

提问人:Viktor VN 提问时间:6/2/2022 最后编辑:Viktor VN 更新时间:6/3/2022 访问量:983

问:

我正在尝试在势内使用速度-Verlet 积分器对恒星轨道进行积分。但是,我目前的算法每个轨道大约需要 25 秒,我的目标是整合大约 1000 个轨道,这大约需要 7 个小时。因此,我希望有人能帮我优化算法:

Velocity-Verlet 积分器定义为(有关实现,请参阅):verlet_integration()Velocity-Verlet integrator

我想使用简单对数势 (SLP) 对轨道进行积分,定义为:SLP potential

该值始终等于,并且该值可以是 或 (请参阅)。v_01q0.70.9SLP()

通过使用这个电位,可以计算出该积分器所需的加速度(参见apply_forces())

在此之后,我选择作为所有轨道的起始值,以及能量。使用该起始值可以计算(请参阅x = 0E = 0vxcalc_vx())

为了获得足够精确的轨道,时间步长需要更小或更小。我稍后需要分析这些轨道,因为它们需要足够长,因此我在 和 之间积分。1E-4t=0t=200

我需要计算 (y, vy) 的整个允许相空间的值。允许的相空间是该方法不产生负数的平方根的地方。因此,为此需要整合大量的轨道。我希望能够计算出至少 1000 个值。但 10,000 绝对会更理想。虽然也许这要求太多了。 如果您有任何改进性能的想法,请告诉我。使用其他语言不是一种选择,所以请不要建议这样做。calc_vx()

这些轨道的外观示例可以在这里看到:enter image description here

运行代码所需的所有内容都应在下面找到,以及运行代码的起始值。

更新:我已经实施了 mivkov 的建议,这将时间缩短到 9 秒,快了 3 秒,但仍然很慢。仍然欢迎任何其他建议

import numpy as np


def calc_vx(y, vy, q):
    """
    Calculate starting value of x velocity
    """
    vx2 = -np.log((y / q) ** 2) - vy ** 2
    return np.sqrt(vx2)


def apply_forces(x, y, q):
    """
    Apply forces to determine the accelerations
    """
    Fx = -x / (y ** 2 / q ** 2 + x ** 2)
    Fy = -y / (q ** 2 * x ** 2 + y ** 2)
    return Fx, Fy


def verlet_integration(start, dt, steps, q):
    # initialise an array and set the first value to the starting value
    vals = np.zeros((steps, *np.shape(start)))
    vals[0] = start

    # run through all elements and apply the integrator to each value
    for i in range(steps - 1):
        x_vec, v_vec, a_vec = vals[i]
        new_x_vec = x_vec + dt * (v_vec + 0.5 * a_vec * dt)
        new_a_vec = apply_forces(*new_x_vec, q)
        new_v_vec = v_vec + 0.5 * (a_vec + new_a_vec) * dt
        vals[i + 1] = new_x_vec, new_v_vec, new_a_vec

    # I return vals.T so i can use the following to get arrays for the position, velocity and acceleration
    # ((x, vx, ax), (y, vy, ay)) = verlet_integration_vec( ... )
    return vals.T


def integration(y, vy, dt, t0, t1, q):
    # calculate the starting values
    vx = calc_vx(y, vy, q)
    ax, ay = apply_forces(0, y, q)
    start = [(0, y), (vx, vy), (ax, ay)]
    steps = round((t1 - t0) / dt)  # bereken het aantal benodigde stappen

    e = verlet_integration(start, dt, steps, q)  # integreer
    return e

((x, vx, ax), (y, vy, ay)) = integration(0.1, 0.2, 1E-4, 0, 100, 0.7)
python numpy 性能 天文学 verlet 集成

评论

6赞 mivkov 6/2/2022
哦,天哪,这里有很多东西要解开。你的代码看起来很漂亮,很干净,但首先,如果你想要性能,python不是你要走的路。您是否考虑过使用 C 或 Fortran 等 HPC 语言?你可以用它执行计算,并用结果写一个格式化的文件,然后使用 python 快速绘制它。
1赞 mivkov 6/2/2022
除此之外,您还可以做几件事。例如,删除您所做的所有“不必要”的薄片。虽然它们使你的代码看起来不错,但它们使用了不必要的资源。例如:a) 不要在函数中定义函数。b) 返回一个列表。为什么?只需返回两个值。c) 返回一个数组(它需要是一个数组吗?你不能只返回 3 个值吗?最重要的是,你首先定义数组,然后让 numpy 转置它。如果您真的需要,只需将数组定义为转置即可。apply_forcesfunc_verlet
1赞 mivkov 6/2/2022
解决此类问题的方法是使用计时器和时间,确定您在哪些功能上花费了多少时间,并查看您最有可能在哪些方面节省一些时间,以及在哪些方面投入时间进行优化最有效。
1赞 mivkov 6/2/2022
最后,如果您需要更快的速度,还有一些其他技巧。例如,numpy 擅长矢量化操作,但考虑到随着时间的推移你只集成了一个点,这对你没有多大帮助。另一种选择是查看 numba 库。有一些选项可以在使用函数时“预编译”函数,这可能会显著提高性能。
1赞 Viktor VN 6/3/2022
是的,你是对的,对不起,现在应该修复了

答:

2赞 Jérôme Richard 6/3/2022 #1

首先要做的是使用探查器来分析代码中的热点。你可以用它来做到这一点。基本报告显示,几乎所有时间都花在函数上: 和 (称为 200_000 次):cProfile.runx_funcv_funcapply_forces

   ncalls  tottime  percall  cumtime  percall filename:lineno(function)
   200000    0.496    0.000    0.496    0.000 apply_forces
   199999    0.279    0.000    2.833    0.000 func_verlet
   199999    0.713    0.000    0.713    0.000 x_func
   199999    0.780    0.000    0.780    0.000 v_func
        1    0.000    0.000    0.000    0.000 SLP
   199999    0.222    0.000    0.718    0.000 a_func
        1    0.446    0.446    3.279    3.279 verlet_integration
[...]

快速分析并显示您正在微小数组上调用 Numpy 函数。问题是 Numpy 没有针对处理如此小的数组进行优化(Numpy 对输入进行许多检查,与微小数组的计算时间相比,这些检查成本很高)。x_funcv_func

解决此问题的主要方法是使用在更大的数组上运行的矢量化 Numpy 调用。问题是这个解决方案需要完全重新设计你的代码,以便获得循环,这是问题的根源。for i in range(steps - 1)

另一种解决方案是使用 Numba 以避免 Numpy 开销。这个解决方案要简单得多(尽管如果你的目标是学习 Numpy,你的老师可能不会预料到这一点)。下面是一个示例:

import numpy as np
import numba as nb


@nb.njit
def SLP(x, y, v0, q):
    return 0.5 * v0 ** 2 * np.log(x ** 2 + (y / q) ** 2)


@nb.njit
def calc_vx(x, y, vy, q):
    """
    Calculate starting value of x velocity
    """
    vx2 = -2 * SLP(x, y, 1, q) - vy ** 2
    return np.sqrt(vx2)


@nb.njit
def apply_forces(x, y, v0, q):
    """
    Apply forces to determine the accelerations
    """
    Fx = -(v0 ** 2 * x) / (y ** 2 / q ** 2 + x ** 2)
    Fy = -(v0 ** 2 * y) / (q ** 2 * x ** 2 + y ** 2)
    return np.array([Fx, Fy])


@nb.njit
def x_func(x_vec, v_vec, a_vec, dt):
        return x_vec + dt * (v_vec + 0.5 * a_vec * dt)

@nb.njit
def v_func(v_vec, a_vec, new_a_vec, dt):
    return v_vec + 0.5 * (a_vec + new_a_vec) * dt

@nb.njit
def a_func(x_vec, dt, q):
    x, y = x_vec
    return apply_forces(x, y, 1, q)


# The parameter is a signature of the function that provides the input type to 
# Numba so it can eagerly compile the function and all the dependent functions.
# Please read the Numba documentation for more information about this.
@nb.njit('(float64[:], float64[:], float64[:], float64, float64)')
def func_verlet(x_vec, v_vec, a_vec, dt, q):
    # calculate the new position, velocity and acceleration
    new_x_vec = x_func(x_vec, v_vec, a_vec, dt)
    new_a_vec = a_func(new_x_vec, dt, q)
    new_v_vec = v_func(v_vec, a_vec, new_a_vec, dt)
    out = np.empty((len(new_x_vec), 3))
    out[:,0] = new_x_vec
    out[:,1] = new_v_vec
    out[:,2] = new_a_vec
    return out


def verlet_integration(start, f, dt, steps, q):
    # initialise an array and set the first value to the starting value
    vals = np.zeros((steps, *np.shape(start)))
    vals[0] = start

    # run through all elements and apply the integrator to each value
    for i in range(steps - 1):
        vals[i + 1] = f(*vals[i].T, dt, q)

    # I return vals.T so i can use the following to get arrays for the position, velocity and acceleration
    # ((x, y), (vx, vy), (ax, ay)) = verlet_integration_vec( ... )
    return vals.T


def integration(y, vy, dt, t0, t1, q):
    # calculate the starting values
    x = 0
    vx = calc_vx(x, y, vy, q)
    ax, ay = apply_forces(x, y, 1, q)
    start = [(x, vx, ax), (y, vy, ay)]
    steps = round((t1 - t0) / dt)  # calculate the number of necessary steps

    return verlet_integration(start, func_verlet, dt, steps, q)


((x_, y_), (vx_, vy_), (ax_, ay_)) = integration(0.1, 0.2, 1E-4, 0, 20, 0.7)

请注意,某些函数已被修改,以避免使用 Numba 不支持的函数/运算符。例如,不支持用于解包 Numpy 数组或列表的一元运算符,但无论如何它通常效率很低。*

生成的代码速度提高了 5 倍

探查器现在显示,由于 for 循环(包括运算符和函数调用)的开销,该函数负责执行的主要部分。由于 lambda,这部分不容易移植到 Numba。我认为,如果您成功地重新设计了这部分以避免 lambda 和解包,它可以提高两倍的速度。事实上,即使使用 Numba,对 2 个项目的数组进行操作也是非常低效的。在标量上操作会使代码的可读性降低一些,但速度要快得多(当然,无论有没有 Numba)。我想代码可以再次提高几倍。verlet_integration*


更新:通过更新的代码,Numba 可以更好地提供帮助,因为主要的性能瓶颈现已修复。这是新的Numba版本:

import numpy as np
import numba as nb


@nb.njit
def calc_vx(y, vy, q):
    vx2 = -np.log((y / q) ** 2) - vy ** 2
    return np.sqrt(vx2)


@nb.njit
def apply_forces(x, y, q):
    Fx = -x / (y ** 2 / q ** 2 + x ** 2)
    Fy = -y / (q ** 2 * x ** 2 + y ** 2)
    return np.array([Fx, Fy])


@nb.njit('(float64[:,:], float64, int_, float64)')
def verlet_integration(start, dt, steps, q):
    vals = np.zeros((steps, 3, 2))
    vals[0] = start

    for i in range(steps - 1):
        x_vec, v_vec, a_vec = vals[i]
        new_x_vec = x_vec + dt * (v_vec + 0.5 * a_vec * dt)
        x, y = new_x_vec
        new_a_vec = apply_forces(x, y, q)
        new_v_vec = v_vec + 0.5 * (a_vec + new_a_vec) * dt
        vals[i + 1, 0] = new_x_vec
        vals[i + 1, 1] = new_v_vec
        vals[i + 1, 2] = new_a_vec

    return vals.T


def integration(y, vy, dt, t0, t1, q):
    vx = calc_vx(y, vy, q)
    ax, ay = apply_forces(0, y, q)
    start = [(0, y), (vx, vy), (ax, ay)]
    steps = round((t1 - t0) / dt)

    e = verlet_integration(np.array(start), dt, steps, q)
    return e


((x, vx, ax), (y, vy, ay)) = integration(0.1, 0.2, 1E-4, 0, 100, 0.7)  # 9.7

这比问题的更新代码快 36 倍。在我的机器上只需要 0.27 秒,而初始代码需要 9.7 秒。

评论

0赞 Michael Szczesny 6/3/2022
我有一个与您的解决方案非常相似的解决方案,只是我“内联”apply_forces。你能在你的机器上重现我的基准测试结果吗(在谷歌 colab 上加速 ~2 倍)?
0赞 Jérôme Richard 6/3/2022
@MichaelSzczesny是的,我可以复制它,它确实快了一倍。有关更多信息,请参阅问题中的相关评论:)。
1赞 Viktor VN 6/3/2022
您将如何矢量化 for 循环?我考虑过这一点,但是您需要数组的上一个元素才能获得下一个元素,因此我看不出如何通过矢量化操作来做到这一点。
3赞 Robert Dodier 6/3/2022 #2

我看到有两件事可以在这里提供帮助。

(1) 考虑使用库中的 ODE 求解器,而不是您手动编写的 Verlet 方法。Runge-Kutta 方法及其变体(例如 Runge-Kutta-Fehlberg)广泛适用,我会先尝试一下。库方法很可能是用 C 代码编写的,因此速度要快得多,而且有人已经解决了错误。这两个方面对于这个问题都很有用。

(2) 如果您需要自己编写 ODE 求解器,那么在 Python 中加快实现速度的一个想法是在轨迹上矢量化,以便您可以使用 Numpy 数组操作。也就是说,创建一个表示所有 1000 或 10,000 条轨迹的数组,然后一步一步地推进所有轨迹。我假设你可以用矩阵形式重写运动方程。

顺便说一句,我的建议是保留你已经制定的解决方案,因为它似乎正在工作,并使用想法(1)或(2)开始一个单独的实现,并使用你的原始实现来验证结果。

评论

1赞 Lutz Lehmann 6/3/2022
(2) 仅适用于具有固定时间步长的方法或保证在整个积分间隔内保持接近的解决方案集合。
0赞 Viktor VN 6/3/2022
嗨,谢谢你的评论。第一个不是一个选项,ODE求解器需要我们自己编写。在对 1 个轨道进行了更多改进后,我将时间缩短到 3.5 秒。我还矢量化了该函数,它起作用了。但是我不能对 1000 个轨道进行矢量化操作,因为我的 ram 立即被填满 1000 次 6x2000 万 64 位浮点数,我认为是 16 GB 吗?无论如何,我看到我的内存被填满了,我的电脑死机了。你怎么能解决这个问题?只是将它们分批拆分,还是有更好的方法?
1赞 Robert Dodier 6/3/2022
维克多,好吧,你必须编写自己的求解器,明白了。关于在轨道上矢量化,如果 1000 太多,请尝试 100 甚至 10。此外,尝试将最终时间从 200 秒减少到 2 秒左右,直到代码正常工作。您可以对其进行设置,使轨道的最终值成为 t =(前 t 最终值)到 t =(前 t 最终值 + 少量秒数)的新解的初始条件,然后只存储轨道块,直到处理完所有块。
0赞 Robert Dodier 6/3/2022
@LutzLehmann当然,但OP方程是一个固定时间步长的ODE求解器。我想我可以明确地说,对不起,我懒惰了。
0赞 Lutz Lehmann 6/16/2022
@ViktorVN : 即使所需的时间步长本身很小,您也应该检查在随后的统计评估中实际使用了哪些时间步长。您只能使用相应的更粗略的时间分辨率来保存值,将 10 或 100 个积分步骤组合成一个保存状态。这应该有助于同时轨迹的记忆。
2赞 Michael Szczesny #3

在我的基准测试中,内联apply_forces将 @JérômeRichard 的解决方案速度提高了 ~2 倍。

import numpy as np
import numba as nb   #tested with numba 0.55.2

@nb.njit
def calc_vx(y, vy, q):
    vx2 = -np.log((y / q) ** 2) - vy ** 2
    return np.sqrt(vx2)

@nb.njit
def verlet_integration(start, dt, steps, q):
    vals = np.zeros((steps, 3, 2))
    vals[0] = start

    for i in range(steps - 1):
        x_vec, v_vec, a_vec = vals[i]
        x, y = x_vec + dt * (v_vec + 0.5 * a_vec * dt)
        ax, ay =  -x / (y ** 2 / q ** 2 + x ** 2), -y / (q ** 2 * x ** 2 + y ** 2)
        vx, vy = v_vec[0] + 0.5 * (a_vec[0] + ax) * dt, v_vec[1] + 0.5 * (a_vec[1] + ay) * dt

        vals[i + 1, 0] = x, y
        vals[i + 1, 1] = vx, vy
        vals[i + 1, 2] = ax, ay
    return vals.T

@nb.njit
def integration(y, vy, dt, t0, t1, q):
    vx = calc_vx(y, vy, q)
    ax, ay = 0, -y / y**2
    start = np.array([[0, y], [vx, vy], [ax, ay]])
    steps = round((t1 - t0) / dt) 

    return verlet_integration(start, dt, steps, q)

((x, vx, ax), (y, vy, ay)) = integration(0.1, 0.2, 1E-4, 0, 100, 0.7)

评论

1赞 Lutz Lehmann 6/3/2022
通过设置在循环外部然后计算,可以减少力计算的冗余。q2=q**2fac = -1/(q2*x**2 + y**2); ax,ay = fac*q2*x, fac*y
1赞 Jérôme Richard 6/3/2022
请注意,这并不是内联使它更快,而是操作直接在标量上而不是在 Numpy 数组上完成。Numba 还不能在堆栈上分配数组,因此它们被分配在堆上,这对于只有 2 个项目的数组来说非常昂贵。我希望未来的实现能够实现这一点(这是当前实现中已知的长期问题)。很好的优化!