如何在 .retain 回调中 .await?

How to .await inside .retain callback?

提问人:Fred Hors 提问时间:10/17/2023 更新时间:10/18/2023 访问量:53

问:

我正在将以下代码与无界的 mpsc 通道一起使用,它运行得非常好。

现在我需要使用有界的 mpsc 通道,但它不再起作用,因为现在我需要“”打开,因为它可能已满。.await.send()

由于我想删除我正在使用的空键和空键,不幸的是它不支持异步回调:VecHashMapretain

pub async fn broadcast(&self, message: Message) {
    let clients = self.clients.clone();

    let message = Arc::new(message);

    tokio::spawn(async move {
        let mut clients = clients.lock().unwrap();

        let players = clients.get_mut(&message.team_id).unwrap();

        // I would like to remove dropped connections and remove empty Vec too below

        // It works with unbounded channels because I don't need to ".await" on it, now with bounded channel I need to ".await" because it can be full

        players.retain(|_, emitters| {
            emitters.retain(|emitter| {
                // Better logic here...
                emitter.sender.send(message.clone()).await.is_ok()
            });

            !emitters.is_empty()
        });
    });
}

REPL:https://www.rustexplorer.com/b/hh5m79

法典:

/*
[dependencies]
axum = { version = "0.6.20" }
futures = { version = "0.3.28", default-features = false }
tokio = { version = "1.32.0", default-features = false, features = [
    "macros",
    "process",
    "rt-multi-thread",
] }
tokio-stream = { version = "0.1.14", default-features = false, features = [
    "sync",
] }
*/

use axum::{
    extract::State,
    response::{
        sse::{Event, KeepAlive, Sse},
        Html,
    },
    routing::get,
    Router,
};
use futures::stream::Stream;
use std::{
    collections::HashMap,
    convert::Infallible,
    sync::{Arc, Mutex},
};
use tokio::sync::mpsc;
use tokio_stream::{wrappers::ReceiverStream, StreamExt};

type TeamId = String;
type PlayerId = String;

#[derive(Default)]
pub struct Broadcaster {
    clients: Arc<Mutex<HashMap<TeamId, HashMap<PlayerId, Vec<Connection>>>>>,
}

pub struct Connection {
    session_id: String,
    player_id: String,
    sender: mpsc::Sender<Arc<Message>>,
}

pub struct Message {
    pub team_id: TeamId,
    pub session_id: String,
    pub message: String,
}

struct AppState {
    broadcaster: Arc<Broadcaster>,
}

#[tokio::main]
async fn main() {
    let broadcaster = Arc::new(Broadcaster::default());

    let app_state = Arc::new(AppState { broadcaster });

    let app = Router::new()
        .route("/send_message", get(send_message))
        .route("/sse", get(sse_handler))
        .with_state(app_state);

    axum::Server::bind(&"0.0.0.0:3000".parse().unwrap())
        .serve(app.into_make_service())
        .await
        .unwrap();
}

async fn send_message(State(app_state): State<Arc<AppState>>) -> Html<&'static str> {
    let new_fake_message = Message {
        team_id: "fake_one".to_string(),
        session_id: "fake_one".to_string(),
        message: "fake_one".to_string(),
    };

    app_state.broadcaster.broadcast(new_fake_message).await;

    Html("Message sent")
}

async fn sse_handler(
    State(app_state): State<Arc<AppState>>,
) -> Sse<impl Stream<Item = Result<Event, Infallible>>> {
    let rx = app_state
        .broadcaster
        .add_client("fake_one", "fake_one", "fake_one")
        .await;

    let mystream = ReceiverStream::<Arc<Message>>::new(rx)
        .map(|res| Ok(Event::default().data(res.message.to_string())));

    Sse::new(mystream).keep_alive(KeepAlive::default())
}

impl Broadcaster {
    pub async fn add_client(
        &self,
        session_id: &str,
        team_id: &str,
        player_id: &str,
    ) -> mpsc::Receiver<Arc<Message>> {
        let (tx, rx) = mpsc::channel::<Arc<Message>>(10);

        let mut clients = self.clients.lock().unwrap();

        if !clients.contains_key(team_id) {
            clients.insert(team_id.to_string(), HashMap::new());
        }

        let players = clients.get_mut(team_id).unwrap();

        if !players.contains_key(player_id) {
            players.insert(player_id.to_string().into(), Vec::new());
        }

        let connections = players.get_mut(player_id).unwrap();

        let connection = Connection {
            session_id: session_id.to_string(),
            player_id: player_id.to_string(),
            sender: tx,
        };

        connections.push(connection);

        rx
    }

    pub async fn broadcast(&self, message: Message) {
        let clients = self.clients.clone();

        let message = Arc::new(message);

        tokio::spawn(async move {
            let mut clients = clients.lock().unwrap();

            let players = clients.get_mut(&message.team_id).unwrap();

            // I would like to remove dropped connections and remove empty Vec too

            // It works with unbounded channels because I don't need to ".await" on it, now with bounded channel I need to await because it can be full

            players.retain(|_, emitters| {
                emitters.retain(|emitter| {
                    // Better logic here...
                    emitter.sender.send(message.clone()).await.is_ok()
                });

                !emitters.is_empty()
            });
        });
    }
}
rust async-await 回调

评论

0赞 isaactfa 10/17/2023
您可以使用 tokio::runtime::Handle::current().block_on: 阻止未来,直到它完成。这显然失去了存在的好处,但如果你真的需要它,你将不得不使用支持它的数据结构。tokio::runtime::Handle::current().block_on(emitter.sender.send(message.clone())).is_ok()async
0赞 Fred Hors 10/17/2023
谢谢。你是什么意思?you'll have to use data structures that support it
0赞 isaactfa 10/17/2023
我只是说支持异步突变的数据结构。
0赞 Fred Hors 10/17/2023
我可以改变一切。但我仍然不明白你在这种特定情况下是什么意思。我应该更改什么数据结构?哈希图?我正在使用 Arc 和互斥锁。我错在哪里?
0赞 cafce25 10/17/2023
但是不支持异步,所以你必须改成支持异步的东西(或者如果它尚不存在,则自己实现)HashMapVecretain

答: 暂无答案