Rust:用人造丝并行化后,不用等到最后,按顺序写出结果

Rust: After parallelization with rayon, write out results in order without waiting until the end

提问人:Wouter De Coster 提问时间:8/23/2023 更新时间:8/24/2023 访问量:157

问:

我正在使用 rayon 为某些迭代器并行运行任务(特别是:床文件中的记录,但我认为这并不重要)。最多可以有 ~700.000 个任务。.par_bridge().for_each(|r| { do_work(r) } )

我想打印(stdout 或文件)每次调用 do_work() 的结果,但仅按照原始迭代器的顺序进行打印。我可以在所有并行作业完成后对所有输出进行排序,但将所有结果存储到最后将需要更多的内存。我可以为每个项目添加一个索引,并在完成后打印出第一个,存储其余的直到轮到他们,但我不确定如何最好地实现这样的系统,或者它是否是最好的解决方案。你有什么建议?.enumerate()

色人造丝

评论

1赞 Chayim Friedman 8/23/2023
rayon通常将迭代器一分为二。所以,第二个线程中的第一个任务基本上是第 350,000 个任务。如果在编写之前阻止等待任务完成,则将有效地禁用并行性的成本。您可以使用单独的线程并使用通道将结果发送给它,但我不确定这是否比将所有结果存储在 a 中并在最后对其进行排序要好得多。Vec
0赞 Wouter De Coster 8/23/2023
啊,我明白了。我的问题假设 8 个线程首先处理任务 1 到 8,当第一个线程完成时,它处理任务 9。好吧,那么这似乎不是一个可行的方法。
0赞 Kevin Reid 8/24/2023
@ChayimFriedman不会也不能将其输入分成两半,因为普通人不允许这样做。所以在这种情况下,这是可能的。.par_bridge()Iterator
0赞 Chayim Friedman 8/24/2023
@KevinReid啊,对了,我没看到是.不过,它的效率较低,:)par_bridge()

答:

1赞 Kevin Reid 8/24/2023 #1

正如@ChayimFriedman所提到的,这不一定可行,因为喜欢从大块开始细分工作,所以顺序不会友好。但是,由于您正在使用 ,人造丝必须从订单中取出物品,因此订单将接近原始订单。因此,在不消耗大量内存的情况下,使用缓冲区和 恢复原始顺序是可行的。rayon.par_bridge()Iterator.enumerate()

这是一个演示程序。

use std::collections::HashMap;
use std::sync::mpsc;
use std::time::Duration;

use rand::Rng;
use rayon::prelude::{ParallelBridge, ParallelIterator};

fn main() {
    let data_source = (0..500u32).rev();

    // Channel with enough capacity to hold an item from each thread
    let (tx, rx) = mpsc::sync_channel(std::thread::available_parallelism().map_or(8, |n| n.get()));

    rayon::scope(|s| {
        s.spawn(move |_| {
            data_source
                .enumerate()
                .par_bridge()
                .map(|(i, value)| {
                    // pretend to do some work
                    std::thread::sleep(Duration::from_millis(
                        1000 + rand::thread_rng().gen_range(0..10),
                    ));
                    (i, value)
                })
                .for_each_with(tx, |tx, pair| {
                    let _ = tx.send(pair);
                });
        });

        recover_order(rx, emit);
    });
}

fn emit(item: u32) {
    println!("done with {item}");
}

fn recover_order<T>(rx: mpsc::Receiver<(usize, T)>, mut op: impl FnMut(T)) {
    let mut next_index: usize = 0;
    let mut buffer: HashMap<usize, T> = HashMap::new();
    for (i, value) in rx {
        if i == next_index {
            op(value);
            next_index += 1;
            while let Some((_, value)) = buffer.remove_entry(&next_index) {
                op(value);
                next_index += 1;
            }
        } else {
            // Item is out of order
            buffer.insert(i, value);
        }
    }

    assert!(buffer.is_empty(), "channel closed with missing items");

    println!("Buffer capacity used: {}", buffer.capacity());
}

将项目从 Rayon 控件传输到通道,函数使用通道以按正确顺序调用项目。for_each_with()recover_order()emit()

使用 and 允许并行迭代在现有的 Rayon 线程池上“在后台”运行,以便当前线程可以直接处理接收。rayon::scope()spawn()for_each_with()

评论

0赞 Chayim Friedman 8/24/2023
我不确定使用这么小的容量是个好主意。它可以通过有效地序列化并行性来破坏并行性的目的。
0赞 Kevin Reid 8/24/2023
@ChayimFriedman啊,我一直在想“接收器会比单个线程快得多”,但这不一定是真的,即使是这样,对可用通道容量的争夺可能仍然是一个问题。我已将其更新为使用 .(当然,如果跟不上每个线程的工作,那么无论如何它都会成为瓶颈;这是问题固有的,不是可以通过选择算法来解决的。available_parallelism()emit