提问人:Wouter De Coster 提问时间:8/23/2023 更新时间:8/24/2023 访问量:157
Rust:用人造丝并行化后,不用等到最后,按顺序写出结果
Rust: After parallelization with rayon, write out results in order without waiting until the end
问:
我正在使用 rayon 为某些迭代器并行运行任务(特别是:床文件中的记录,但我认为这并不重要)。最多可以有 ~700.000 个任务。.par_bridge().for_each(|r| { do_work(r) } )
我想打印(stdout 或文件)每次调用 do_work() 的结果,但仅按照原始迭代器的顺序进行打印。我可以在所有并行作业完成后对所有输出进行排序,但将所有结果存储到最后将需要更多的内存。我可以为每个项目添加一个索引,并在完成后打印出第一个,存储其余的直到轮到他们,但我不确定如何最好地实现这样的系统,或者它是否是最好的解决方案。你有什么建议?.enumerate()
答:
1赞
Kevin Reid
8/24/2023
#1
正如@ChayimFriedman所提到的,这不一定可行,因为喜欢从大块开始细分工作,所以顺序不会友好。但是,由于您正在使用 ,人造丝必须从订单中取出物品,因此订单将接近原始订单。因此,在不消耗大量内存的情况下,使用缓冲区和 恢复原始顺序是可行的。rayon
.par_bridge()
Iterator
.enumerate()
这是一个演示程序。
use std::collections::HashMap;
use std::sync::mpsc;
use std::time::Duration;
use rand::Rng;
use rayon::prelude::{ParallelBridge, ParallelIterator};
fn main() {
let data_source = (0..500u32).rev();
// Channel with enough capacity to hold an item from each thread
let (tx, rx) = mpsc::sync_channel(std::thread::available_parallelism().map_or(8, |n| n.get()));
rayon::scope(|s| {
s.spawn(move |_| {
data_source
.enumerate()
.par_bridge()
.map(|(i, value)| {
// pretend to do some work
std::thread::sleep(Duration::from_millis(
1000 + rand::thread_rng().gen_range(0..10),
));
(i, value)
})
.for_each_with(tx, |tx, pair| {
let _ = tx.send(pair);
});
});
recover_order(rx, emit);
});
}
fn emit(item: u32) {
println!("done with {item}");
}
fn recover_order<T>(rx: mpsc::Receiver<(usize, T)>, mut op: impl FnMut(T)) {
let mut next_index: usize = 0;
let mut buffer: HashMap<usize, T> = HashMap::new();
for (i, value) in rx {
if i == next_index {
op(value);
next_index += 1;
while let Some((_, value)) = buffer.remove_entry(&next_index) {
op(value);
next_index += 1;
}
} else {
// Item is out of order
buffer.insert(i, value);
}
}
assert!(buffer.is_empty(), "channel closed with missing items");
println!("Buffer capacity used: {}", buffer.capacity());
}
将项目从 Rayon 控件传输到通道,函数使用通道以按正确顺序调用项目。for_each_with()
recover_order()
emit()
使用 and 允许并行迭代在现有的 Rayon 线程池上“在后台”运行,以便当前线程可以直接处理接收。rayon::scope()
spawn()
for_each_with()
评论
0赞
Chayim Friedman
8/24/2023
我不确定使用这么小的容量是个好主意。它可以通过有效地序列化并行性来破坏并行性的目的。
0赞
Kevin Reid
8/24/2023
@ChayimFriedman啊,我一直在想“接收器会比单个线程快得多”,但这不一定是真的,即使是这样,对可用通道容量的争夺可能仍然是一个问题。我已将其更新为使用 .(当然,如果跟不上每个线程的工作,那么无论如何它都会成为瓶颈;这是问题固有的,不是可以通过选择算法来解决的。available_parallelism()
emit
评论
rayon
通常将迭代器一分为二。所以,第二个线程中的第一个任务基本上是第 350,000 个任务。如果在编写之前阻止等待任务完成,则将有效地禁用并行性的成本。您可以使用单独的线程并使用通道将结果发送给它,但我不确定这是否比将所有结果存储在 a 中并在最后对其进行排序要好得多。Vec
.par_bridge()
Iterator
par_bridge()