'tokio::try_join!' 在 'JoinHandle 的向量上

`tokio::try_join!` on a vector of `JoinHandle`s

提问人:Mike Altonji 提问时间:11/9/2023 更新时间:11/9/2023 访问量:72

问:

Rust 新手。我想等待以下异步任务完成:

let _ = tokio::try_join!(fetch_handle, evaluate_handles[0], evaluate_handles[1], evaluate_handles[2], evaluate_handles[3]);

有没有办法在不对每个项目进行硬编码的情况下做到这一点? 是一个 ,并且只是一个evaluate_handlesevaluate_handlesVec<JoinHandle<()>>fetch_handleJoinHandle<()>

锈-tokio

评论


答:

2赞 cdhowie 11/9/2023 #1

为此,你可以将所有 s 放入 FuturesUnordered 中,然后将其作为 :JoinHandleStream

use std::time::Duration;

use futures::{stream::FuturesUnordered, StreamExt};

#[tokio::main]
async fn main() {
    let tasks = FuturesUnordered::new();

    for i in 0..10 {
        tasks.push(tokio::spawn(async move {
            println!("task {i} starting");
            tokio::time::sleep(Duration::from_secs(1)).await;
            println!("task {i} finished");
        }));
    }

    println!("joining tasks");

    tasks.for_each(|_| async {}).await;

    println!("joined all tasks");
}

(操场)

请注意,你可以进入一个,所以如果你能将任务表达为迭代器,你可以使代码变得简单得多。collectFuturesUnorderedJoinHandles

use std::time::Duration;

use futures::{stream::FuturesUnordered, StreamExt};

#[tokio::main]
async fn main() {
    let tasks: FuturesUnordered<_> = (0..10)
        .map(|i| {
            tokio::spawn(async move {
                println!("task {i} starting");
                tokio::time::sleep(Duration::from_secs(1)).await;
                println!("task {i} finished");
            })
        })
        .collect();

    println!("joining tasks");

    tasks.for_each(|_| async {}).await;

    println!("joined all tasks");
}

(操场)

评论

0赞 Caesar 11/9/2023
这与@OP的行为略有不同:考虑一下这个游乐场。 返回第一个错误时中止,等待所有错误。此外,由于您已经在生成,因此实际上没有必要,您可以等待循环中的连接句柄以获得相同的结果。(我也对“简单得多”;P)笑了笑try_joinFuturesUnorderedFuturesUnordered
0赞 Caesar 11/9/2023
嗯。我必须放弃。行为差异是存在的,但它无关紧要:被加入的东西是 s,只有当产生的未来恐慌时才会加入,@OP可能并不关心。(我的游乐场应该看起来像这样,以证明差异。JoinHandleErr
1赞 cdhowie 11/9/2023
如果他们确实关心任务本身的恐慌或错误,则可以使用 而不是 .但也有可能推迟到所有任务完成也很重要,所以有一些未知数。无论如何,它为解决这些用例的任意组合提供了基础,即使有一种更简单的方法来表达其中的一些用例。try_for_eachfor_eachFuturesUnordered
0赞 Caesar 11/9/2023
try_for_each对我来说并不明显(可以实现 TryStream 的事实也让我感到惊讶)。像这样的例子可能已经解释了更好的灵活性。FuturesUnorderedwhile let Some(completed) = tasks.next().await { /* handle task results in order of completion */ }FuturesUnordered
1赞 cdhowie 11/9/2023
@Caesar我不确定它有什么不明显的。它返回一个你忽略的(编译器也会警告你:),所以即使有人惊慌失措,加入所有任务似乎也就不足为奇了。您需要以某种方式处理,例如:play.rust-lang.org/...Resultwarning: unused Result that must be usedResult
2赞 Caesar 11/9/2023 #2

一般来说,你可以用在任务(futures)的迭代器上做:TryJoinAlltry_join!

use futures::future::TryJoinAll;
let join_evaluate_handles = evaluate_handles.into_iter().collect::<TryJoinAll<_>>();

在您的例子中,所有连接句柄都具有相同的类型,因此您可以构造一个包含所有句柄的迭代器:

evaluate_handles.into_iter()
    .chain(once(fetch_handle))
    .collect::<TryJoinAll<_>>()
    .await

如果它们不是同一类型,则将组合和:TryJoinAlltry_join!

try_join!(fetch_handle, join_evaluate_handles /* from above */)

但是请注意,我不确定在这里使用它是否非常有意义。的要点是,如果其中一个期货错误,它会立即返回。但是你的未来是 s,只有当生成的任务崩溃时才会出错:Playground Now,在生成的未来中恐慌会使 tokio 打印出一条丑陋的错误消息,你通常希望避免这种情况。我能想到的唯一实际用例是,如果某处发生恐慌,请中止您的应用程序。但我认为没有那么多麻烦的方法可以做到这一点。try_jointry_joinJoinHandletry_join


还有一点需要注意:如果您使用的是 或 ,则一开始就不需要您的任务。TryJoinAllFuturesUnorderedtokio::spawn