提问人:Hellagur 提问时间:7/30/2023 最后编辑:Hellagur 更新时间:8/12/2023 访问量:174
有没有办法使通知去抖动观察器异步?
Is there some way to make notify debounce watcher async?
问:
我在这里的通知存储库中问了同样的问题 有没有办法使 debounce-watcher 异步?
但是,程序仍然停留在 ,等待事件。tokio::sync::mpsc::channel
while let Some(res) = rx.recv().await
我已经检查了通知异步观察程序的用法,async_monitor不确定为什么它被卡住了,我是否遗漏了什么?
这是我的代码
use notify::{RecursiveMode, Watcher, ReadDirectoryChangesWatcher};
use std::{thread, path::Path, time::Duration};
use chrono::prelude::*;
use notify_debouncer_full::{new_debouncer, Debouncer, FileIdMap, DebounceEventResult};
use tokio::runtime::{Runtime, Handle};
use tokio::sync::mpsc;
fn get_runtime_handle() -> (Handle, Option<Runtime>) {
match Handle::try_current() {
Ok(h) => (h, None),
Err(_) => {
let rt = Runtime::new().unwrap();
(rt.handle().clone(), Some(rt))
}
}
}
pub struct NotifyHandler {
pub notify_watcher: Option<Debouncer<ReadDirectoryChangesWatcher, FileIdMap>>,
}
impl NotifyHandler {
pub async fn initialize_notify_scheduler(&mut self) {
let (mut tx, mut rx) = tokio::sync::mpsc::channel(1);
let debouncer = new_debouncer(Duration::from_secs(3), None, move |result: DebounceEventResult| {
let (handle, _rt) = get_runtime_handle();
handle.block_on(async {
tx.send(result).await.unwrap();
})
});
match debouncer {
Ok(watcher)=> {
println!("Initialize notify watcher success");
self.notify_watcher = Some(watcher);
while let Some(res) = rx.recv().await {
match res {
Ok(events) => {
println!("events: {:?}", events);
},
Err(errors) => {
println!("erros: {:?}", errors)
}
}
}
},
Err(error) => {
println!("{:?}", error);
}
}
}
pub fn watch(&mut self, path: &str) -> notify::Result<()> {
let watch_path = Path::new(path);
if watch_path.exists() {
let is_file = watch_path.is_file();
println!("Valid path {} is file {}", path, is_file);
} else {
println!("watch path {:?} not exists", watch_path);
}
if let Some(mut watcher) = self.notify_watcher.take() {
watcher
.watcher()
.watch(watch_path, RecursiveMode::Recursive)?;
watcher
.cache()
.add_root(watch_path, RecursiveMode::Recursive);
}
Ok(())
}
}
#[tokio::main]
async fn main() {
let mut notifier: NotifyHandler = NotifyHandler { notify_watcher: None };
notifier.initialize_notify_scheduler().await;
loop {
thread::sleep(Duration::from_secs(2));
let time: DateTime<Local> = Local::now();
println!("{}: Hello, world!", time.format("%Y-%m-%d %H:%M:%S").to_string());
}
}
这是我的代码,没有和使用,它有一些关于和寿命的问题。block_on()
await
result
tx
use notify::{RecursiveMode, Watcher, ReadDirectoryChangesWatcher, Error};
use std::{path::Path, time::Duration};
use chrono::prelude::*;
use notify_debouncer_full::{new_debouncer, Debouncer, FileIdMap, DebounceEventResult, DebouncedEvent};
use tokio::sync::mpsc::Receiver;
pub struct NotifyHandler {
pub notify_watcher: Option<Debouncer<ReadDirectoryChangesWatcher, FileIdMap>>,
pub receiver: Option<Receiver<Result<Vec<DebouncedEvent>, Vec<Error>>>>
}
impl NotifyHandler {
pub async fn initialize_notify_scheduler(&mut self) {
let (tx, rx) = tokio::sync::mpsc::channel(1);
let debouncer = new_debouncer(Duration::from_secs(3), None, move |result: DebounceEventResult| {
tokio::spawn(async move {
if let Err(e) = tx.send(result).await {
println!("Error sending event result: {:?}", e);
}
});
});
match debouncer {
Ok(watcher)=> {
println!("Initialize notify watcher success");
self.notify_watcher = Some(watcher);
self.receiver = Some(rx);
},
Err(error) => {
println!("{:?}", error);
}
}
}
pub async fn watch(&mut self, path: &str) -> notify::Result<()> {
let watch_path = Path::new(path);
if watch_path.exists() {
let is_file = watch_path.is_file();
println!("Valid path {} is file {}", path, is_file);
} else {
println!("watch path {:?} not exists", watch_path);
}
if let Some(mut watcher) = self.notify_watcher.take() {
watcher
.watcher()
.watch(watch_path, RecursiveMode::Recursive)?;
watcher
.cache()
.add_root(watch_path, RecursiveMode::Recursive);
if let Some(mut rx) = self.receiver.take() {
tokio::spawn(async move {
while let Some(res) = rx.recv().await {
match res {
Ok(events) => {
println!("events: {:?}", events);
},
Err(errors) => {
println!("errors: {:?}", errors)
}
}
}
});
}
}
Ok(())
}
}
#[tokio::main]
async fn main() {
let mut notifier: NotifyHandler = NotifyHandler { notify_watcher: None, receiver: None };
notifier.initialize_notify_scheduler().await;
notifier.watch("D:\\TEMP\\TestNote.txt").await.unwrap();
loop {
tokio::time::sleep(Duration::from_secs(3)).await;
let time: DateTime<Local> = Local::now();
println!("{}: Hello, world!", time.format("%Y-%m-%d %H:%M:%S").to_string());
}
}
expected a closure that implements the `FnMut` trait, but this closure only implements `FnOnce`
required for `[closure@src\main.rs:16:69: 16:103]` to implement `DebounceEventHandler`rustcClick for full compiler diagnostic
main.rs(16, 69): the requirement to implement `FnMut` derives from here
main.rs(18, 33): closure is `FnOnce` because it moves the variable `tx` out of its environment
main.rs(16, 25): required by a bound introduced by this call
lib.rs(634, 25): required by a bound in `new_debouncer`
答:
0赞
Hellagur
8/12/2023
#1
非常感谢@fakeshadow,对于任何可能遇到相同问题的人,关键是 ,这里是工作代码。runtime
[package]
name = "notify_test"
version = "0.1.0"
edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
chrono = "0.4.26"
notify = { version = "6.0.1", default-features = false, features = ["macos_kqueue"] }
notify-debouncer-full = "0.2.0"
tokio = { version = "1", features = ["full"] }
use chrono::prelude::*;
use notify::{Error, ReadDirectoryChangesWatcher, RecursiveMode, Watcher};
use notify_debouncer_full::{
new_debouncer, DebounceEventResult, DebouncedEvent, Debouncer, FileIdMap,
};
use std::{path::Path, time::Duration};
use tokio::{runtime::Handle, sync::mpsc::Receiver};
pub struct NotifyHandler {
pub notify_watcher: Option<Debouncer<ReadDirectoryChangesWatcher, FileIdMap>>,
pub receiver: Option<Receiver<Result<Vec<DebouncedEvent>, Vec<Error>>>>,
}
impl NotifyHandler {
pub async fn initialize_notify_scheduler(&mut self) {
let (tx, rx) = tokio::sync::mpsc::channel(1);
let rt = Handle::current();
let debouncer = new_debouncer(
Duration::from_secs(3),
None,
move |result: DebounceEventResult| {
let tx = tx.clone();
println!("calling by notify -> {:?}", &result);
rt.spawn(async move {
if let Err(e) = tx.send(result).await {
println!("Error sending event result: {:?}", e);
}
});
},
);
match debouncer {
Ok(watcher) => {
println!("Initialize notify watcher success");
self.notify_watcher = Some(watcher);
self.receiver = Some(rx);
}
Err(error) => {
println!("{:?}", error);
}
}
}
pub async fn watch(&mut self, path: &str) -> notify::Result<()> {
let watch_path = Path::new(path);
if watch_path.exists() {
let is_file = watch_path.is_file();
println!("Valid path {} is file {}", path, is_file);
} else {
println!("watch path {:?} not exists", watch_path);
}
if let Some(watcher) = self.notify_watcher.as_mut() {
watcher
.watcher()
.watch(watch_path, RecursiveMode::Recursive)?;
watcher
.cache()
.add_root(watch_path, RecursiveMode::Recursive);
if let Some(mut rx) = self.receiver.take() {
tokio::spawn(async move {
while let Some(res) = rx.recv().await {
match res {
Ok(events) => {
println!("events: {:?}", events);
}
Err(errors) => {
println!("errors: {:?}", errors)
}
}
}
});
}
}
Ok(())
}
}
#[tokio::main]
async fn main() {
let mut notifier: NotifyHandler = NotifyHandler {
notify_watcher: None,
receiver: None,
};
notifier.initialize_notify_scheduler().await;
notifier.watch("D:\\Temp\\program\\test_md.txt").await.unwrap();
loop {
tokio::time::sleep(Duration::from_secs(3)).await;
let time: DateTime<Local> = Local::now();
println!(
"{}: Hello, world!",
time.format("%Y-%m-%d %H:%M:%S").to_string()
);
}
}
评论
std::thread::sleep
tokio::time::sleep
spawn_blocking
block_on()
await
tokio
tokio::time::sleep
block_on()
futures::executor::block_on()
result
tx