Tokio mpsc 在将发送方分配给静态时关闭通道

Tokio mpsc closes channel when sender assigned to static

提问人:porkbrain 提问时间:11/16/2023 更新时间:11/16/2023 访问量:31

问:

我正在努力理解以下代码段的行为:

use tokio::sync::mpsc::{error::SendError, unbounded_channel, UnboundedSender};
use std::sync::OnceLock;

static GSENDER: OnceLock<UnboundedSender<&'static str>> = OnceLock::new();

fn main() {
    let (sender, mut channel) = unbounded_channel();
    
    GSENDER.set(sender).unwrap();

    tokio::runtime::Builder::new_multi_thread()
        .worker_threads(1) // on a new thread
        .enable_all()
        .build()
        .unwrap()
        .spawn(async move {
            println!("[{:?}] Starting channel", chrono::Utc::now());

            while let Some(msg) = channel.recv().await {
                println!("[{:?}] Recvd: {msg}", chrono::Utc::now());
            }
            
            println!("[{:?}] Closing channel", chrono::Utc::now());
        });
       
    // Does not help, as it shouldn't anyway
    // std::thread::sleep(std::time::Duration::from_secs(1));
        
    if let Some(channel_in) = GSENDER.get() {
        if let Err(SendError(_)) = channel_in.send("test") {
            println!("[{:?}] Channel down", chrono::Utc::now());
        }
    } else {
        unreachable!()
    }
}

链接到游乐场进行重现

创建新的运行时,生成未来。 然后,正在轮询。 同时,我抓住了发件人的一半并尝试发送消息。 在这一点上,接收者要么被移动到未来,要么(增加睡眠)它甚至正在轮询。recvrecv

为什么发件人报告频道已关闭?

锈-tokio

评论


答:

2赞 Chayim Friedman 11/16/2023 #1

当运行时被丢弃时,在 tokio 运行时中生成的所有任务都会关闭(在下一个时间点)。这里的运行时是临时的,因此在语句的末尾删除了它。该任务将只运行到第一个点。.await.await

将运行时设为一个活变量,它就可以工作了:

use std::sync::OnceLock;
use tokio::sync::mpsc::{error::SendError, unbounded_channel, UnboundedSender};

static GSENDER: OnceLock<UnboundedSender<&'static str>> = OnceLock::new();

fn main() {
    let (sender, mut channel) = unbounded_channel();

    GSENDER.set(sender).unwrap();

    let runtime = tokio::runtime::Builder::new_multi_thread()
        .worker_threads(1) // on a new thread
        .enable_all()
        .build()
        .unwrap();
    runtime.spawn(async move {
        println!("[{:?}] Starting channel", chrono::Utc::now());

        while let Some(msg) = channel.recv().await {
            println!("[{:?}] Recvd: {msg}", chrono::Utc::now());
        }

        println!("[{:?}] Closing channel", chrono::Utc::now());
    });

    if let Some(channel_in) = GSENDER.get() {
        if let Err(SendError(_)) = channel_in.send("test") {
            println!("[{:?}] Channel down", chrono::Utc::now());
        }
    } else {
        unreachable!()
    }
}

游乐场

评论

0赞 porkbrain 11/16/2023
谢谢,我误读了文档> 调用 spawn 时,提供的 future 将立即开始在后台运行,即使您不等待返回的 JoinHandle。
0赞 cdhowie 11/17/2023
另请注意,不能保证“Recvd”消息将打印此代码,因为在任务被唤醒之前,运行时可能会被删除(返回时)。channel_in.send()main()