如何在 rust 中触发异步回调

How to fire async callback in rust

提问人:lulijun 提问时间:11/29/2022 最后编辑:lulijun 更新时间:11/29/2022 访问量:728

问:

我正在尝试在 Rust 中实现 StateMachine,但是在尝试在生成线程中触发 StateMachine 的回调时遇到了一些问题。

这是我的 StateMachine 结构。状态是一个通用的 T,因为我想在许多不同的场景中使用它,并且我使用 Vec 来存储所有注册到这个 StateMachine 中的回调。

一开始,我没有使用生存期 'a,但它会遇到一些生存期问题,所以我通过这个建议添加了生存期 'a: Rust 中的惯用回调

pub struct StateMachine<'a, T> where T:Clone+Eq+'a {
    state: RwLock<T>,
    listeners2: Vec<Arc<Mutex<ListenerCallback<'a, T>>>>,
}

pub type ListenerCallback<'a, T> = dyn FnMut(T) -> Result<()> + Send + Sync + 'a ;

当状态发生变化时,StateMachine 将触发所有回调,如下所示。

pub async fn try_set(&mut self, new_state:T) -> Result<()> {
        if (block_on(self.state.read()).deref().eq(&new_state)) {
            return Ok(())
        }
        // todo change the state

        // fire every listener in spawn
        let mut fire_results = vec![];
        for listener in &mut self.listeners2 {
            let state = new_state.clone();
            let fire_listener = listener.clone();
            fire_results.push(tokio::spawn(async move {
                let mut guard  = fire_listener.lock().unwrap();
                guard.deref_mut()(state);
            }));
        }
        // if fire result return Err, return it
        for fire_result in fire_results {
            fire_result.await?;
        }
        Ok(())
    }

但这会导致编译错误。

error[E0521]: borrowed data escapes outside of associated function
  --> src/taf/taf-core/src/execution/state_machine.rs:54:33
   |
15 | impl<'a,T> StateMachine<'a,T> where T:Clone+Eq+Send {
   |      -- lifetime `'a` defined here
...
34 |     pub async fn try_set(&mut self, new_state:T) -> Result<()> {
   |                          --------- `self` is a reference that is only valid in the associated function body
...
54 |             let fire_listener = listener.clone();
   |                                 ^^^^^^^^^^^^^^^^
   |                                 |
   |                                 `self` escapes the associated function body here
   |                                 argument requires that `'a` must outlive `'static`

##########################################################

完整的代码加上很多业务逻辑,所以我重写了 2 个 demo 如下,问题是一样的。第一个 demo 同步触发回调并工作正常,第二个 demo 尝试异步触发回调,它遇到了同样的问题:这里转义了关联的函数体。self

第一个演示(有效):

use std::alloc::alloc;
use std::ops::DerefMut;
use std::sync::{Arc, Mutex, RwLock};
use anyhow::Result;
use dashmap::DashMap;

struct StateMachine<'a,T> where T:Clone+Eq+'a {
    state: T,
    listeners: Vec<Box<Callback<'a, T>>>,
}

type Callback<'a, T> = dyn FnMut(T) -> Result<()> + Send + Sync + 'a;

impl<'a, T> StateMachine<'a,T> where T:Clone+Eq+'a {

    pub fn new(init_state: T) -> Self {
        StateMachine {
            state: init_state,
            listeners: vec![]
        }
    }

    pub fn add_listener(&mut self, listener: Box<Callback<'a, T>>) -> Result<()> {
        self.listeners.push(listener);
        Ok(())
    }

    pub fn set(&mut self, new_state: T) -> Result<()> {

        self.state = new_state.clone();

        for listener in &mut self.listeners {
            listener(new_state.clone());
        }
        Ok(())
    }
}

#[derive(Clone, Eq, PartialEq, Hash)]
enum ExeState {
    Waiting,
    Running,
    Finished,
    Failed,
}

struct Execution<'a> {
    exec_id: String,
    pub state_machine: StateMachine<'a, ExeState>,
}

struct ExecManager<'a> {
    all_jobs: Arc<RwLock<DashMap<String, Execution<'a>>>>,
    finished_jobs: Arc<RwLock<Vec<String>>>,
}

impl<'a> ExecManager<'a> {

    pub fn new() -> Self {
        ExecManager {
            all_jobs: Arc::new(RwLock::new(DashMap::new())),
            finished_jobs: Arc::new(RwLock::new(vec![]))
        }
    }

    fn add_job(&mut self, job_id: String) {
        let mut execution = Execution {
            exec_id: job_id.clone(),
            state_machine: StateMachine::new(ExeState::Waiting)
        };

        // add listener
        let callback_finished_jobs = self.finished_jobs.clone();
        let callback_job_id = job_id.clone();
        execution.state_machine.add_listener( Box::new(move |new_state| {
            println!("listener fired!, job_id {}", callback_job_id.clone());
            if new_state == ExeState::Finished || new_state == ExeState::Failed {
                let mut guard = callback_finished_jobs.write().unwrap();
                guard.deref_mut().push(callback_job_id.clone());

            }
            Ok(())
        }));

        let mut guard = self.all_jobs.write().unwrap();
        guard.deref_mut().insert(job_id, execution);
    }

    fn mock_exec(&mut self, job_id: String) {
        let mut guard = self.all_jobs.write().unwrap();
        let mut exec = guard.deref_mut().get_mut(&job_id).unwrap();

        exec.state_machine.set(ExeState::Finished);
    }

}


#[test]
fn test() {
    let mut manager = ExecManager::new();

    manager.add_job(String::from("job_id1"));
    manager.add_job(String::from("job_id2"));

    manager.mock_exec(String::from("job_id1"));
    manager.mock_exec(String::from("job_id2"));


}

第二个演示:

use std::alloc::alloc;
use std::ops::DerefMut;
use std::sync::{Arc, Mutex, RwLock};
use anyhow::Result;
use dashmap::DashMap;
use petgraph::algo::astar;

struct StateMachine<'a,T> where T:Clone+Eq+Send+'a {
    state: T,
    listeners: Vec<Arc<Mutex<Box<Callback<'a, T>>>>>,
}

type Callback<'a, T> = dyn FnMut(T) -> Result<()> + Send + Sync + 'a;

impl<'a, T> StateMachine<'a,T> where T:Clone+Eq+Send+'a {

    pub fn new(init_state: T) -> Self {
        StateMachine {
            state: init_state,
            listeners: vec![]
        }
    }

    pub fn add_listener(&mut self, listener: Box<Callback<'a, T>>) -> Result<()> {
        self.listeners.push(Arc::new(Mutex::new(listener)));
        Ok(())
    }

    pub fn set(&mut self, new_state: T) -> Result<()> {

        self.state = new_state.clone();

        for listener in &mut self.listeners {
            let spawn_listener = listener.clone();
            tokio::spawn(async move {
                let mut guard = spawn_listener.lock().unwrap();
                guard.deref_mut()(new_state.clone());
            });
        }
        Ok(())
    }
}

#[derive(Clone, Eq, PartialEq, Hash)]
enum ExeState {
    Waiting,
    Running,
    Finished,
    Failed,
}

struct Execution<'a> {
    exec_id: String,
    pub state_machine: StateMachine<'a, ExeState>,
}

struct ExecManager<'a> {
    all_jobs: Arc<RwLock<DashMap<String, Execution<'a>>>>,
    finished_jobs: Arc<RwLock<Vec<String>>>,
}

impl<'a> ExecManager<'a> {

    pub fn new() -> Self {
        ExecManager {
            all_jobs: Arc::new(RwLock::new(DashMap::new())),
            finished_jobs: Arc::new(RwLock::new(vec![]))
        }
    }

    fn add_job(&mut self, job_id: String) {
        let mut execution = Execution {
            exec_id: job_id.clone(),
            state_machine: StateMachine::new(ExeState::Waiting)
        };

        // add listener
        let callback_finished_jobs = self.finished_jobs.clone();
        let callback_job_id = job_id.clone();
        execution.state_machine.add_listener( Box::new(move |new_state| {
            println!("listener fired!, job_id {}", callback_job_id.clone());
            if new_state == ExeState::Finished || new_state == ExeState::Failed {
                let mut guard = callback_finished_jobs.write().unwrap();
                guard.deref_mut().push(callback_job_id.clone());

            }
            Ok(())
        }));

        let mut guard = self.all_jobs.write().unwrap();
        guard.deref_mut().insert(job_id, execution);
    }

    fn mock_exec(&mut self, job_id: String) {
        let mut guard = self.all_jobs.write().unwrap();
        let mut exec = guard.deref_mut().get_mut(&job_id).unwrap();

        exec.state_machine.set(ExeState::Finished);
    }

}


#[test]
fn test() {
    let mut manager = ExecManager::new();

    manager.add_job(String::from("job_id1"));
    manager.add_job(String::from("job_id2"));

    manager.mock_exec(String::from("job_id1"));
    manager.mock_exec(String::from("job_id2"));


}

第二个演示的编译错误:

error[E0521]: borrowed data escapes outside of associated function
  --> generic/src/callback2.rs:34:34
   |
15 | impl<'a, T> StateMachine<'a,T> where T:Clone+Eq+Send+'a {
   |      -- lifetime `'a` defined here
...
29 |     pub fn set(&mut self, new_state: T) -> Result<()> {
   |                --------- `self` is a reference that is only valid in the associated function body
...
34 |             let spawn_listener = listener.clone();
   |                                  ^^^^^^^^^^^^^^^^
   |                                  |
   |                                  `self` escapes the associated function body here
   |                                  argument requires that `'a` must outlive `'static`
   |
   = note: requirement occurs because of the type `std::sync::Mutex<Box<dyn FnMut(T) -> Result<(), anyhow::Error> + Send + Sync>>`, which makes the generic argument `Box<dyn FnMut(T) -> Result<(), anyhow::Error> + Send + Sync>` invariant
   = note: the struct `std::sync::Mutex<T>` is invariant over the parameter `T`
   = help: see <https://doc.rust-lang.org/nomicon/subtyping.html> for more information about variance
Rust 回调 闭包

评论

0赞 Nikolay Zakirov 11/29/2022
你介意分享你的完整代码吗?
0赞 lulijun 11/29/2022
@NikolayZakirov 完整的代码加上很多业务逻辑,所以我重写了 2 个演示并将它们添加到问题中。第一个同步触发回调并工作,sceond 一个尝试异步触发回调,它遇到了同样的问题。

答:

1赞 Kevin Reid 11/29/2022 #1

生成的任务不能使用借来的数据(这里是具有生存期的数据,无论它是什么)。这是因为目前没有(而且可能永远不会)有任何方法可以保证借用的数据可靠地超过生成的任务。tokio::spawn()'a

您有两种选择:

  1. 在不生成的情况下触发通知。您可以将通知期货放入 FuturesUnordered 中以同时运行它们,但它们仍然必须在完成之前完成。try_set()

  2. 删除 lifetime 参数;停止允许借用数据的回调。必要时穿上你的类型。更改 的用户,以便他们不会尝试使用借来的数据,而是在必要时改用。'staticdynStateMachineArc

    pub struct StateMachine<T> where T: Clone + Eq + 'static {
       state: RwLock<T>,
       listeners2: Vec<Arc<Mutex<ListenerCallback<T>>>>,
    }
    
    pub type ListenerCallback<T> = dyn FnMut(T) -> Result<()> + Send + Sync + 'static;
    

评论

0赞 lulijun 11/30/2022
我还有两个难题:1.编译错误在 tokio::spawn 之前的“listener.clone()”,我克隆了一个 Arc 变量并将其移动到 tokio::spwan。我不知道为什么它与自我有关联,这是否意味着即使我克隆了自我的变量,克隆的对象仍然会有一个借来的自我?换句话说,没有办法执行与自变量有任何关联的 tokio spwan 任务,即使是克隆的
0赞 lulijun 11/30/2022
2. 一开始,编译器的建议是给 dyn 添加一个 'static lifetime',但是在实际执行过程中,dyn 的生命周期应该和 StateMachine 一样,StateMachine 的寿命应该和 Execution 一样,但它们的寿命应该比 ExecManager 短,ExecManager 是一个长寿命服务。那么,如果我在dyn中添加一个'static',会对实际执行过程产生一些影响吗?
0赞 Kevin Reid 11/30/2022
@lulijun我不知道为什么会确切地提到这个错误,但这并不重要。至于“如果我在dyn中添加一个'static',它会对实际的执行过程产生一些影响吗?”——不,它不会。生存期永远不会改变程序的运行方式。事实上,从理论上讲,编译一个完全忽略生命周期的 Rust 程序是可能的,它也会做同样的事情——只是没有静态检查它是否健全。self
0赞 lulijun 11/30/2022
谢谢!虽然我不完全理解编译器在这两种情况下所做的事情之间的区别,但将生存期更改为“静态工作”。