提问人:porkbrain 提问时间:11/16/2023 更新时间:11/16/2023 访问量:31
Tokio mpsc 在将发送方分配给静态时关闭通道
Tokio mpsc closes channel when sender assigned to static
问:
我正在努力理解以下代码段的行为:
use tokio::sync::mpsc::{error::SendError, unbounded_channel, UnboundedSender};
use std::sync::OnceLock;
static GSENDER: OnceLock<UnboundedSender<&'static str>> = OnceLock::new();
fn main() {
let (sender, mut channel) = unbounded_channel();
GSENDER.set(sender).unwrap();
tokio::runtime::Builder::new_multi_thread()
.worker_threads(1) // on a new thread
.enable_all()
.build()
.unwrap()
.spawn(async move {
println!("[{:?}] Starting channel", chrono::Utc::now());
while let Some(msg) = channel.recv().await {
println!("[{:?}] Recvd: {msg}", chrono::Utc::now());
}
println!("[{:?}] Closing channel", chrono::Utc::now());
});
// Does not help, as it shouldn't anyway
// std::thread::sleep(std::time::Duration::from_secs(1));
if let Some(channel_in) = GSENDER.get() {
if let Err(SendError(_)) = channel_in.send("test") {
println!("[{:?}] Channel down", chrono::Utc::now());
}
} else {
unreachable!()
}
}
创建新的运行时,生成未来。
然后,正在轮询。
同时,我抓住了发件人的一半并尝试发送消息。
在这一点上,接收者要么被移动到未来,要么(增加睡眠)它甚至正在轮询。recv
recv
为什么发件人报告频道已关闭?
答:
2赞
Chayim Friedman
11/16/2023
#1
当运行时被丢弃时,在 tokio 运行时中生成的所有任务都会关闭(在下一个时间点)。这里的运行时是临时的,因此在语句的末尾删除了它。该任务将只运行到第一个点。.await
.await
将运行时设为一个活变量,它就可以工作了:
use std::sync::OnceLock;
use tokio::sync::mpsc::{error::SendError, unbounded_channel, UnboundedSender};
static GSENDER: OnceLock<UnboundedSender<&'static str>> = OnceLock::new();
fn main() {
let (sender, mut channel) = unbounded_channel();
GSENDER.set(sender).unwrap();
let runtime = tokio::runtime::Builder::new_multi_thread()
.worker_threads(1) // on a new thread
.enable_all()
.build()
.unwrap();
runtime.spawn(async move {
println!("[{:?}] Starting channel", chrono::Utc::now());
while let Some(msg) = channel.recv().await {
println!("[{:?}] Recvd: {msg}", chrono::Utc::now());
}
println!("[{:?}] Closing channel", chrono::Utc::now());
});
if let Some(channel_in) = GSENDER.get() {
if let Err(SendError(_)) = channel_in.send("test") {
println!("[{:?}] Channel down", chrono::Utc::now());
}
} else {
unreachable!()
}
}
游乐场。
评论
0赞
porkbrain
11/16/2023
谢谢,我误读了文档> 调用 spawn 时,提供的 future 将立即开始在后台运行,即使您不等待返回的 JoinHandle。
0赞
cdhowie
11/17/2023
另请注意,不能保证“Recvd”消息将打印此代码,因为在任务被唤醒之前,运行时可能会被删除(返回时)。channel_in.send()
main()
评论