通过 RabbitMQ 在 Rust 中取消长时间运行的人造丝任务的策略 [已关闭]

Strategies for Cancelling Long-Running Rayon Tasks in Rust via RabbitMQ [closed]

提问人:karbonbasedlifeform 提问时间:11/7/2023 最后编辑:karbonbasedlifeform 更新时间:11/8/2023 访问量:67

问:


想改进这个问题吗?更新问题,使其仅通过编辑这篇文章来关注一个问题。

12天前关闭。

我正在 Rust 中开发一个执行器,以处理计算密集型任务,作为更大的 Web 应用程序的一部分。执行器从 RabbitMQ(使用 lapin)接收作业并处理各种受 CPU 限制的操作,其中包括一些内部使用 rayon 的操作。我知道异步会使人造丝的使用复杂化。在计算或任务取消后,它会将结果或取消状态传达回 RabbitMQ。

有问题的操作超出了我的控制范围,主要由长时间运行的并行图算法组成。我正在探索通过单独的 RabbitMQ 取消队列被动取消这些任务的策略,而无需重写底层库,因为它们(据我所知)与异步模式不兼容,并且出于安全原因普遍缺乏内置任务取消支持。

我已经使用 tokio 制作了一个执行器的原型,只是为了在事后认识到人造丝的潜在问题。目前的取消策略是基于 tokio oneshot 通道的,在不释放 CPU 资源的情况下,它会低效地丢弃任务。我正在寻找有关制作一个惯用执行器的建议,该执行器可以有效地取消任务并符合 Rust 的安全保证。

编辑:我会尽量更具体:从本质上讲,我正在寻找一种策略来从另一个线程取消(长时间运行的、阻塞的、受 CPU 限制的)任务,有效地中止计算并释放 CPU。使用 tokio 的 oneshot 通道等机制优雅地发出关机信号不是一种选择,因为我无法控制长时间运行操作的实现。

兔Q 锈-tokio 人造丝

评论

0赞 Aleksander Krauze 11/7/2023
我不太明白你的问题(如果可以的话,请更具体一些),但是如果你想取消异步任务并且正在使用 ,那么看看 tokio-util crate 中的 CancellationTokentokio
0赞 karbonbasedlifeform 11/7/2023
不幸的是,我现在有效地使用了 CancellationToken。问题在于,在这种情况下,昂贵的操作不会产生 CPU。我目前的取消策略与这个例子非常相似。我试图澄清手头的具体问题。谢谢你指出这个问题不够具体!
0赞 Jmb 11/7/2023
AFAIK 如果不修改其代码以包含正常关闭通知机制,例如一次性通道或 .AtomicBool
0赞 Eldar 11/7/2023
不是 Rust 专家,但您可以生成进程并杀死它们以取消。它更易于实施,并且对内存或 CPU 泄漏是安全的
0赞 cafce25 11/7/2023
一般来说,没有线程的配合,就不可能停止线程,更不用说停止线程中运行的任务了。

答:

4赞 Kevin Reid 11/7/2023 #1

停止线程的唯一合理方法是让它与之合作。有多种方法可以做到这一点,但它们都涉及一些普遍存在的东西:

  • Rust 代码可能会故意让步(返回自 ),让执行者或包含 future 有机会决定不再轮询它。asyncPoll::Pendingpoll()
  • 您可以查阅原子标志或消息通道,然后返回或恐慌,而不是继续。
  • 最后,在某些情况下可以进行安排,以便在线程的执行突然终止的情况下,这不会损坏任何东西。但是,这需要完全控制线程执行的所有代码,线程不得调用任何第三方库。我听说过唯一能成功做到这一点的系统是“虚拟机”,其中代码由解释器或 JIT 管理。

因此,考虑到调用你无法控制的代码的约束,你最多可以做的就是让线程继续,直到控制权返回到你编写的代码(此时你可以干净地终止线程)。panic!()

关于如何进行的一些想法:

  • 如果你调用的代码本身使用了 Rayon,那么也许你可以为 Rayon 贡献一个取消功能(或运行一个修补版本),以检查每个或等效版本的取消。rayon::join
  • 请记住,如果代码调用了您提供的任何迭代器,这些迭代器也是您控制的代码,您可以安排恐慌。
  • 将取消标志添加到您正在调用的特定库。

您可能想知道:为什么取消线程会损坏状态?两个例子:

  • 让 Java 弃用它的经典例子是考虑线程所持有的锁会发生什么:如果它们保持锁定状态,那么与锁共享的任何内容本身都会被永远阻塞,从而产生挂起线程的传染;如果它们被解锁,则可能会显示无效的半编辑状态。(在 Rust 中,我们有锁的“中毒”,但这并不能解决例如可能悬空的指针或其他类型的无效值;只有可能由恐慌解除引起的应用程序级情况。Thread.stop()

  • 线程可以从其他线程的堆栈中借用数据;人造丝和这样做。突然取消线程将导致释放后使用。std::thread::scope()

这两件事都可以通过添加适当的检查或禁止它们来解决(例如,使用严格的通道和原子,没有锁),但你必须确保线程执行的所有代码都遵守适当的限制,而 Rust 作为一种没有“VM”或“运行时”的语言,不会在全球范围内施加这样的限制。