提问人:Fred Hors 提问时间:10/17/2023 更新时间:10/18/2023 访问量:53
如何在 .retain 回调中 .await?
How to .await inside .retain callback?
问:
我正在将以下代码与无界的 mpsc 通道一起使用,它运行得非常好。
现在我需要使用有界的 mpsc 通道,但它不再起作用,因为现在我需要“”打开,因为它可能已满。.await
.send()
由于我想删除我正在使用的空键和空键,不幸的是它不支持异步回调:Vec
HashMap
retain
pub async fn broadcast(&self, message: Message) {
let clients = self.clients.clone();
let message = Arc::new(message);
tokio::spawn(async move {
let mut clients = clients.lock().unwrap();
let players = clients.get_mut(&message.team_id).unwrap();
// I would like to remove dropped connections and remove empty Vec too below
// It works with unbounded channels because I don't need to ".await" on it, now with bounded channel I need to ".await" because it can be full
players.retain(|_, emitters| {
emitters.retain(|emitter| {
// Better logic here...
emitter.sender.send(message.clone()).await.is_ok()
});
!emitters.is_empty()
});
});
}
REPL:https://www.rustexplorer.com/b/hh5m79
法典:
/*
[dependencies]
axum = { version = "0.6.20" }
futures = { version = "0.3.28", default-features = false }
tokio = { version = "1.32.0", default-features = false, features = [
"macros",
"process",
"rt-multi-thread",
] }
tokio-stream = { version = "0.1.14", default-features = false, features = [
"sync",
] }
*/
use axum::{
extract::State,
response::{
sse::{Event, KeepAlive, Sse},
Html,
},
routing::get,
Router,
};
use futures::stream::Stream;
use std::{
collections::HashMap,
convert::Infallible,
sync::{Arc, Mutex},
};
use tokio::sync::mpsc;
use tokio_stream::{wrappers::ReceiverStream, StreamExt};
type TeamId = String;
type PlayerId = String;
#[derive(Default)]
pub struct Broadcaster {
clients: Arc<Mutex<HashMap<TeamId, HashMap<PlayerId, Vec<Connection>>>>>,
}
pub struct Connection {
session_id: String,
player_id: String,
sender: mpsc::Sender<Arc<Message>>,
}
pub struct Message {
pub team_id: TeamId,
pub session_id: String,
pub message: String,
}
struct AppState {
broadcaster: Arc<Broadcaster>,
}
#[tokio::main]
async fn main() {
let broadcaster = Arc::new(Broadcaster::default());
let app_state = Arc::new(AppState { broadcaster });
let app = Router::new()
.route("/send_message", get(send_message))
.route("/sse", get(sse_handler))
.with_state(app_state);
axum::Server::bind(&"0.0.0.0:3000".parse().unwrap())
.serve(app.into_make_service())
.await
.unwrap();
}
async fn send_message(State(app_state): State<Arc<AppState>>) -> Html<&'static str> {
let new_fake_message = Message {
team_id: "fake_one".to_string(),
session_id: "fake_one".to_string(),
message: "fake_one".to_string(),
};
app_state.broadcaster.broadcast(new_fake_message).await;
Html("Message sent")
}
async fn sse_handler(
State(app_state): State<Arc<AppState>>,
) -> Sse<impl Stream<Item = Result<Event, Infallible>>> {
let rx = app_state
.broadcaster
.add_client("fake_one", "fake_one", "fake_one")
.await;
let mystream = ReceiverStream::<Arc<Message>>::new(rx)
.map(|res| Ok(Event::default().data(res.message.to_string())));
Sse::new(mystream).keep_alive(KeepAlive::default())
}
impl Broadcaster {
pub async fn add_client(
&self,
session_id: &str,
team_id: &str,
player_id: &str,
) -> mpsc::Receiver<Arc<Message>> {
let (tx, rx) = mpsc::channel::<Arc<Message>>(10);
let mut clients = self.clients.lock().unwrap();
if !clients.contains_key(team_id) {
clients.insert(team_id.to_string(), HashMap::new());
}
let players = clients.get_mut(team_id).unwrap();
if !players.contains_key(player_id) {
players.insert(player_id.to_string().into(), Vec::new());
}
let connections = players.get_mut(player_id).unwrap();
let connection = Connection {
session_id: session_id.to_string(),
player_id: player_id.to_string(),
sender: tx,
};
connections.push(connection);
rx
}
pub async fn broadcast(&self, message: Message) {
let clients = self.clients.clone();
let message = Arc::new(message);
tokio::spawn(async move {
let mut clients = clients.lock().unwrap();
let players = clients.get_mut(&message.team_id).unwrap();
// I would like to remove dropped connections and remove empty Vec too
// It works with unbounded channels because I don't need to ".await" on it, now with bounded channel I need to await because it can be full
players.retain(|_, emitters| {
emitters.retain(|emitter| {
// Better logic here...
emitter.sender.send(message.clone()).await.is_ok()
});
!emitters.is_empty()
});
});
}
}
答: 暂无答案
评论
tokio::runtime::Handle::current().block_on:
阻止未来,直到它完成。这显然失去了存在的好处,但如果你真的需要它,你将不得不使用支持它的数据结构。tokio::runtime::Handle::current().block_on(emitter.sender.send(message.clone())).is_ok()
async
you'll have to use data structures that support it
HashMap
Vec
retain