如何使用 Rayon 在 iter::from_fn 上并行化迭代?

How can I use Rayon to parallelize iteration over iter::from_fn?

提问人:Miguel Guthridge 提问时间:11/8/2023 最后编辑:Miguel Guthridge 更新时间:11/10/2023 访问量:60

问:

我有一个函数,我用它来生成一个迭代器,如本答案中所述。iter::from_fn

fn generate_iterator<'a>(
    some_arg: Box<dyn SomeTrait>,
    some_other_arg: &'a [InputType],
) -> impl Iterator<Item = OutputType> + 'a {
    // ...
    iter::from_fn(move || {
        // ...
        Some(value)
    })
}

由于这个迭代器可以生成数百万个(如果不是数十亿个)项目,我希望通过使用 Rayon 并行化迭代来加快迭代速度。

// This is far too slow
generate_iterator(&some_arg)
    .filter(|item| some_logic(item))
    .collect()

但是,当我将人造丝添加到代码中时,我收到一个令人困惑的错误

let matches = generate_iterator(&some_arg)
    .par_iter()
    .filter(|item| some_logic(item))
    .collect_vec();
error[E0599]: the method `par_iter` exists for opaque type `impl Iterator<Item = OutputType>`, but its trait bounds were not satisfied
  --> path/to/my/code.rs:92:10
   |
91 |       let matches = generate_iterator(&some_arg)
   |  ___________________-
92 | |         .par_iter()
   | |         -^^^^^^^^ method cannot be called on `impl Iterator<Item = OutputType>` due to unsatisfied trait bounds
   | |_________|
   | 
   |
   = note: the following trait bounds were not satisfied:
           `&impl Iterator<Item = OutputType> + '_: IntoParallelIterator`
           which is required by `impl Iterator<Item = OutputType> + '_: rayon::iter::IntoParallelRefIterator<'_>`

看来我需要实现,这也是这个答案中建议的。但是,由于我没有直接为 Rayon 尝试并行化的迭代器创建类型,因此我不确定如何实现此特征。我想我可能需要在我的返回类型注释中添加一个子句,但我不确定从哪里开始。IntoParallelIteratorwheregenerate_iterator

由于迭代器包含过多的元素,我无法将其收集到中间步骤中。Vec

如何调整我的代码,以便我可以使用 Rayon 轻松地并行化迭代?iter::from_fn

如果缺少任何上下文,以下是一些指向实际代码的永久链接:

Rust 并行处理 迭代器 Rayon

评论

1赞 cafce25 11/8/2023
我严重怀疑这对你有什么好处——无论如何,你的函数都必须运行并生产串联的项目——但par_bridge可能是你正在寻找的方法。
0赞 Miguel Guthridge 11/8/2023
@cafce25这是一个良好的开端,但不幸的是,它最终需要实现 .我尝试实施,看看是否有帮助,但似乎没有区别。impl Iterator<Item = OutputType>SendSendOutputType
0赞 Miguel Guthridge 11/8/2023
澄清一下,我的迭代器确实生成了串联项目 - 这部分相对较好 - 它生成了一堆潜在的状态,然后需要验证这些状态。这只是我想要并行化的验证部分,所以我可以接受串联生成的项目。
0赞 cafce25 11/8/2023
当您的项目不在时,您希望如何在不同的线程(需要并行)中工作?这只有在您最初在不同的线程上生成项目时才能起作用,而这不是可以做到的。Sendfrom_fn
0赞 Miguel Guthridge 11/8/2023
很抱歉造成混淆,我尝试过制作我的项目类型,但似乎没有区别。我想象 Rayon 的处理方式是在一个线程上生成值,然后将它们发送到工作线程进行处理,在主线程上收集过滤器结果。至少,如果人造丝不适合我,那就是我最终会自己做的事情。Send

答:

0赞 Miguel Guthridge 11/10/2023 #1

发生这种情况是因为生成器函数的一个参数没有实现。如果类型实现 ,则生成的 by 会自动实现 Send,并且如果它接收的闭包捕获的所有数据也实现 SendSendIteratoriter::from_fnItemSend

不幸的是,我的一个论点是 ,这意味着 的实现并不明确。Box<dyn SomeTrait>Send

我能够通过指定我的生成器函数的参数需要实现来解决这个问题。Send

let my_arg: Box<dyn SomeTrait + Send> = ...