异步块创建的 future 不是“发送”

future created by async block is not `Send`

提问人:Vana 提问时间:1/11/2023 更新时间:1/11/2023 访问量:6142

问:

我在 rust 中进行了服务器更新。它在 2 个二进制文件之间创建补丁,并提供静态文件

我试着做

    let mut update_state;
    if let Some(state) = update_stream.next().await {
        if let Ok(state) = state {
            update_state = state
        } else if let Err(err) = state {
            reply = BuildOutput { error: "Update failed: ".to_string() + &err.to_string() }
        }
    } else {
        reply = BuildOutput { error: "Unreacheable".to_string() }
    }

    let state = update_state.borrow();
    let progress = state.histogram.progress();

    let res = update_stream.try_for_each(|_state| future::ready(Ok(()))).await;

但得到

note: future is not `Send` as this value is used across an await
   --> server\grpc\src\rpc.rs:260:50
    |
259 |         let mut update_state;
    |             ---------------- has type `SharedUpdateProgress` which is not `Send`
260 |         if let Some(state) = update_stream.next().await {
    |                                                  ^^^^^^ await occurs here, with `mut update_state` maybe used later
...
305 |     }
    |     - `mut update_state` is later dropped here
    = note: required for the cast from `impl futures::Future<Output = Result<tonic::Response<BuildOutput>, Status>>` to the object type `dyn futures::Future<Output = Result<tonic::Response<BuildOutput>, Status>> + std::marker::Send

SharedUpdateProgress:

#[derive(Clone)]
pub struct SharedUpdateProgress {
    state: Rc<RefCell<UpdateProgress>>,
}

impl SharedUpdateProgress {
    pub fn new(target_revision: CleanName) -> Self {
        Self { state: Rc::new(RefCell::new(UpdateProgress::new(target_revision))) }
    }

    pub fn borrow(&self) -> Ref<'_, UpdateProgress> {
        self.state.borrow()
    }

    pub(crate) fn borrow_mut(&self) -> RefMut<'_, UpdateProgress> {
        self.state.borrow_mut()
    }
}

我不知道为什么,也不知道如何解决它

rust 异步等待 refcell

评论

1赞 Chayim Friedman 1/11/2023
请发布可复制的代码。
0赞 Jmb 1/11/2023
使用 an 代替Arc<Mutex>Rc<RefCell>

答:

3赞 Finomnis 1/11/2023 #1

我假设您的问题的最小可重现示例如下:

use std::{cell::RefCell, rc::Rc};
use tokio::time::{sleep, Duration};

#[derive(Clone)]
pub struct SharedString {
    state: Rc<RefCell<String>>,
}

impl SharedString {
    pub fn new(initial: &str) -> Self {
        Self {
            state: Rc::new(RefCell::new(initial.into())),
        }
    }
}

async fn run() {
    let shared_string = SharedString::new("Hello,");
    sleep(Duration::from_millis(1)).await;
    *shared_string.state.borrow_mut() += " world!";
    sleep(Duration::from_millis(1)).await;
    println!("{:?}", shared_string.state.borrow());
}

#[tokio::main]
async fn main() {
    tokio::task::spawn(run()).await.unwrap();
}
error: future cannot be sent between threads safely
   --> src/main.rs:27:24
    |
27  |     tokio::task::spawn(run()).await.unwrap();
    |                        ^^^^^ future returned by `run` is not `Send`
    |
    = help: within `impl Future<Output = ()>`, the trait `Send` is not implemented for `Rc<RefCell<String>>`
note: future is not `Send` as this value is used across an await
   --> src/main.rs:19:36
    |
18  |     let shared_string = SharedString::new("Hello,");
    |         ------------- has type `SharedString` which is not `Send`
19  |     sleep(Duration::from_millis(1)).await;
    |                                    ^^^^^^ await occurs here, with `shared_string` maybe used later
...
23  | }
    | - `shared_string` is later dropped here
note: required by a bound in `tokio::spawn`
   --> /home/martin/.cargo/git/checkouts/tokio-dd4afa005f1f4b79/686577b/tokio/src/task/spawn.rs:163:21
    |
163 |         T: Future + Send + 'static,
    |                     ^^^^ required by this bound in `tokio::spawn`

tokio Runtime 通常是多线程的,这意味着在任何时候您的任务都可以从一个线程移动到另一个线程。这就是为什么跨一个点的所有东西都必须是 .这显然不是,因为它是一个单线程引用计数器。.await.awaitSendRc<RefCell<>>

溶液:替换为 ,这是线程安全的等效项。Rc<RefCell<>>Arc<Mutex<>>

use std::sync::{Arc, Mutex};
use tokio::time::{sleep, Duration};

#[derive(Clone)]
pub struct SharedString {
    state: Arc<Mutex<String>>,
}

impl SharedString {
    pub fn new(initial: &str) -> Self {
        Self {
            state: Arc::new(Mutex::new(initial.into())),
        }
    }
}

async fn run() {
    let shared_string = SharedString::new("Hello,");
    sleep(Duration::from_millis(1)).await;
    *shared_string.state.lock().unwrap() += " world!";
    sleep(Duration::from_millis(1)).await;
    println!("{:?}", shared_string.state.lock().unwrap());
}

#[tokio::main]
async fn main() {
    tokio::task::spawn(run()).await.unwrap();
}
"Hello, world!"

评论

0赞 Vana 1/11/2023
我替换了,但仍然有问题。完整的错误消息:Arc<Mutex>let state = update_state.lock(); | ----- has type `std::sync::MutexGuard<'_, UpdateProgress>` which is not `Send`
2赞 Finomnis 1/11/2023
呼叫时无法按住锁。你必须先释放它。如果要在 期间保持 if,则必须使用 的互斥锁。不过,它的速度较慢。.await.awaittokio
0赞 Vana 1/12/2023
如果我使用 tokio 互斥锁,我必须使用异步闭包来调用它,并且异步闭包不稳定:s
0赞 Finomnis 1/17/2023
@Vana 是和否;异步闭包实际上是不稳定的,但内部带有块的闭包则不稳定:play.rust-lang.org/...而且我个人还不了解异步闭包的使用,到目前为止,普通闭包中的异步块足以解决我的所有问题。包括您尝试使用互斥锁执行的任何事情。async
0赞 Vana 1/17/2023
是的,我将在我的闭包中使用异步正文而不是异步闭包