-
-
Save Hellager/6bc77d610ff20932ccec379e20599083 to your computer and use it in GitHub Desktop.
Test template for rust notify
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/** | |
* Cargo.toml | |
[package] | |
name = "notify_test" | |
version = "0.1.0" | |
edition = "2021" | |
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html | |
[dependencies] | |
notify = { version = "6.0.1", default-features = false, features = ["macos_kqueue"] } | |
notify-debouncer-full = "0.2.0" | |
tokio = { version = "1", features = ["full"] } | |
* | |
*/ | |
use notify::{RecursiveMode, Watcher, ReadDirectoryChangesWatcher, Error}; | |
use std::{path::Path, time::Duration}; | |
use notify_debouncer_full::{Debouncer, FileIdMap, DebounceEventResult, DebouncedEvent}; | |
use tokio; | |
pub struct NotifyHandler { | |
pub notify_watcher: Option<ReadDirectoryChangesWatcher>, | |
pub debounce_watcher: Option<Debouncer<ReadDirectoryChangesWatcher, FileIdMap>>, | |
pub std_sender: Option<std::sync::mpsc::Sender<Result<Vec<DebouncedEvent>, Vec<Error>>>>, | |
pub std_receiver: Option<std::sync::mpsc::Receiver<Result<Vec<DebouncedEvent>, Vec<Error>>>>, | |
pub debounce_std_sender: Option<std::sync::mpsc::Sender<Result<Vec<DebouncedEvent>, Vec<Error>>>>, | |
pub debounce_std_receiver: Option<std::sync::mpsc::Receiver<Result<Vec<DebouncedEvent>, Vec<Error>>>>, | |
pub notify_tokio_sender: Option<tokio::sync::mpsc::Sender<Result<notify::Event, notify::Error>>>, | |
pub notify_tokio_receiver: Option<tokio::sync::mpsc::Receiver<Result<notify::Event, notify::Error>>>, | |
pub debounce_tokio_sender: Option<tokio::sync::mpsc::Sender<Result<Vec<DebouncedEvent>, Vec<notify::Error>>>>, | |
pub debounce_tokio_receiver: Option<tokio::sync::mpsc::Receiver<Result<Vec<DebouncedEvent>, Vec<notify::Error>>>>, | |
pub debounce_unbounded_sender: Option<tokio::sync::mpsc::UnboundedSender<Result<Vec<DebouncedEvent>, Vec<Error>>>>, | |
pub debounce_unbounded_receiver: Option<tokio::sync::mpsc::UnboundedReceiver<Result<Vec<DebouncedEvent>, Vec<Error>>>>, | |
} | |
impl NotifyHandler { | |
// For [2. With standard notify watcher in struct, without channel] | |
pub fn initialize_notify_watcher_without_timer(&mut self) { | |
println!("Try initialize notify watcher without timer"); | |
let watcher = notify::recommended_watcher(|res| { | |
match res { | |
Ok(event) => println!("event: {:?}", event), | |
Err(e) => println!("watch error: {:?}", e), | |
} | |
}).unwrap(); | |
self.notify_watcher = Some(watcher); | |
println!("Initialize notify watcher success!"); | |
} | |
// For [2. With standard notify watcher in struct, without channel] | |
pub fn notify_watcher_watch_without_timer(&mut self, path: &str) { | |
let watch_path = Path::new(path); | |
if watch_path.exists() { | |
let is_file = watch_path.is_file(); | |
println!("Valid path {} is file {}", path, is_file); | |
} else { | |
println!("watch path {:?} not exists", watch_path); | |
} | |
println!("Try add watch path to notify watcher"); | |
if let Some(mut watcher) = self.notify_watcher.take() { | |
watcher.watch(watch_path, RecursiveMode::Recursive).unwrap(); | |
} | |
println!("Add watch path to notify watcher success!"); | |
} | |
// For [4. With standard notify watcher in async, in struct] | |
pub async fn initialize_notify_watcher_with_tokio_channel(&mut self) { | |
let (tx, rx) = tokio::sync::mpsc::channel(1); | |
println!("Try initialize notify watcher without timer"); | |
let watcher_tx = tx.clone(); | |
let watcher = notify::recommended_watcher(move |res| { | |
println!("Try send events"); | |
let _ = watcher_tx.blocking_send(res); | |
}).unwrap(); | |
self.notify_watcher = Some(watcher); | |
self.notify_tokio_sender = Some(tx); | |
self.notify_tokio_receiver = Some(rx); | |
println!("Initialize notify watcher success!"); | |
} | |
// For [4. With standard notify watcher in async, in struct] | |
pub async fn notify_watcher_watch_with_tokio_channel(&mut self, path: &str) { | |
let watch_path = Path::new(path); | |
if watch_path.exists() { | |
let is_file = watch_path.is_file(); | |
println!("Valid path {} is file {}", path, is_file); | |
} else { | |
println!("watch path {:?} not exists", watch_path); | |
} | |
println!("Try add watch path to notify watcher"); | |
if let Some(mut watcher) = self.notify_watcher.take() { | |
watcher.watch(watch_path, RecursiveMode::Recursive).unwrap(); | |
} | |
println!("Add watch path to notify watcher success!"); | |
} | |
// For [6. With debounce wathcer in struct with std channel] | |
pub fn initialize_debounce_watcher_with_std_channel(&mut self) { | |
let (tx, rx) = std::sync::mpsc::channel(); | |
println!("Try initialize notify watcher without timer"); | |
let watcher_tx = tx.clone(); | |
let watcher = notify_debouncer_full::new_debouncer(Duration::from_secs(3), None, watcher_tx).unwrap(); | |
self.debounce_watcher = Some(watcher); | |
self.debounce_std_sender = Some(tx); | |
self.debounce_std_receiver = Some(rx); | |
println!("Initialize notify watcher success!"); | |
} | |
// For [6. With debounce wathcer in struct with std channel] | |
pub fn debounce_watcher_watch_with_std_channel(&mut self, path: &str) { | |
let watch_path = Path::new(path); | |
if watch_path.exists() { | |
let is_file = watch_path.is_file(); | |
println!("Valid path {} is file {}", path, is_file); | |
} else { | |
println!("watch path {:?} not exists", watch_path); | |
} | |
println!("Try add watch path to notify watcher"); | |
if let Some(mut debouncer) = self.debounce_watcher.take() { | |
debouncer | |
.watcher() | |
.watch(Path::new("D:\\TEMP\\TestNote.txt"), RecursiveMode::Recursive).unwrap(); | |
debouncer | |
.cache() | |
.add_root(Path::new("D:\\TEMP\\TestNote.txt"), RecursiveMode::Recursive); | |
} | |
if let Some(rx) = self.debounce_std_receiver.take() { | |
for result in rx { | |
match result { | |
Ok(events) => events.iter().for_each(|event| println!("{event:?}")), | |
Err(errors) => errors.iter().for_each(|error| println!("{error:?}")), | |
} | |
println!(); | |
} | |
} | |
println!("Add watch path to notify watcher success!"); | |
} | |
// For [8. With debounce wathcer in struct with tokio channel] | |
pub async fn initialize_debounce_watcher_with_tokio_channel(&mut self) { | |
let (tx, rx) = tokio::sync::mpsc::channel(1); | |
println!("Try initialize notify watcher without timer"); | |
let watcher_tx = tx.clone(); | |
let watcher = notify_debouncer_full::new_debouncer(Duration::from_secs(3), None, move |result: DebounceEventResult| { | |
println!("Detect events"); | |
let _ = watcher_tx.send(result); | |
}).unwrap(); | |
self.debounce_watcher = Some(watcher); | |
self.debounce_tokio_sender = Some(tx); | |
self.debounce_tokio_receiver = Some(rx); | |
println!("Initialize notify watcher success!"); | |
} | |
// For [8. With debounce wathcer in struct with tokio channel] | |
pub async fn debounce_watcher_watch_with_tokio_channel(&mut self, path: &str) { | |
let watch_path = Path::new(path); | |
if watch_path.exists() { | |
let is_file = watch_path.is_file(); | |
println!("Valid path {} is file {}", path, is_file); | |
} else { | |
println!("watch path {:?} not exists", watch_path); | |
} | |
println!("Try add watch path to notify watcher"); | |
if let Some(mut debouncer) = self.debounce_watcher.take() { | |
debouncer | |
.watcher() | |
.watch(Path::new("D:\\TEMP\\TestNote.txt"), RecursiveMode::Recursive).unwrap(); | |
debouncer | |
.cache() | |
.add_root(Path::new("D:\\TEMP\\TestNote.txt"), RecursiveMode::Recursive); | |
} | |
println!("Add watch path to notify watcher success!"); | |
println!("Try receive from sender"); | |
if let Some(mut rx) = self.debounce_tokio_receiver.take() { | |
while let Some(res) = rx.recv().await { | |
match res { | |
Ok(events) => { | |
println!("events: {:?}", events); | |
}, | |
Err(errors) => { | |
println!("errors: {:?}", errors) | |
} | |
} | |
} | |
} | |
} | |
// For [10. With debounce wathcer in struct with unbounded channel] | |
pub async fn initialize_debounce_watcher_with_unbounded_channel(&mut self) { | |
let (tx, rx) = tokio::sync::mpsc::unbounded_channel(); | |
println!("Try initialize notify watcher without timer"); | |
let watcher_tx = tx.clone(); | |
let watcher = notify_debouncer_full::new_debouncer(Duration::from_secs(3), None, move |result: DebounceEventResult| { | |
println!("Detect events"); | |
let _ = watcher_tx.send(result); | |
}).unwrap(); | |
self.debounce_watcher = Some(watcher); | |
self.debounce_unbounded_sender = Some(tx); | |
self.debounce_unbounded_receiver = Some(rx); | |
println!("Initialize notify watcher success!"); | |
} | |
// For [10. With debounce wathcer in struct with unbounded channel] | |
pub async fn debounce_watcher_watch_with_bounded_channel(&mut self, path: &str) { | |
let watch_path = Path::new(path); | |
if watch_path.exists() { | |
let is_file = watch_path.is_file(); | |
println!("Valid path {} is file {}", path, is_file); | |
} else { | |
println!("watch path {:?} not exists", watch_path); | |
} | |
println!("Try add watch path to notify watcher"); | |
if let Some(mut debouncer) = self.debounce_watcher.take() { | |
debouncer | |
.watcher() | |
.watch(Path::new("D:\\TEMP\\TestNote.txt"), RecursiveMode::Recursive).unwrap(); | |
debouncer | |
.cache() | |
.add_root(Path::new("D:\\TEMP\\TestNote.txt"), RecursiveMode::Recursive); | |
} | |
if let Some(mut rx) = self.debounce_unbounded_receiver.take() { | |
tokio::spawn(async move { | |
while let Some(res) = rx.recv().await { | |
match res { | |
Ok(events) => { | |
println!("events: {:?}", events); | |
}, | |
Err(errors) => { | |
println!("errors: {:?}", errors) | |
} | |
} | |
} | |
}); | |
} | |
println!("Add watch path to notify watcher success!"); | |
} | |
} | |
// // 1. With standard notify watcher in main -> half failed(No output in main loop, had output for events) | |
// fn main() { | |
// println!("Case 1: With standard notify watcher in main"); | |
// // Automatically select the best implementation for your platform. | |
// println!("Try initialize notify watcher"); | |
// let mut watcher = notify::recommended_watcher(|res| { | |
// match res { | |
// Ok(event) => println!("event: {:?}", event), | |
// Err(e) => println!("watch error: {:?}", e), | |
// } | |
// }).unwrap(); | |
// println!("Initialize notify watcher success!"); | |
// // Add a path to be watched. All files and directories at that path and | |
// // below will be monitored for changes. | |
// println!("Try add watch path to notify watcher"); | |
// watcher.watch(Path::new("D:\\TEMP\\TestNote.txt"), RecursiveMode::Recursive).unwrap(); | |
// println!("Add watch path to notify watcher success!"); | |
// // No output in main loop, had output for events | |
// loop { | |
// std::thread::sleep(Duration::from_secs(10)); | |
// debug!("Program running"); | |
// } | |
// } | |
// // 2. With standard notify watcher in struct, without channel -> failed![no events send] | |
// fn main() { | |
// println!("Case 2: With standard notify watcher in struct, without channel"); | |
// let mut notifier: NotifyHandler = NotifyHandler { | |
// notify_watcher: None, | |
// debounce_watcher: None, | |
// std_sender: None, | |
// std_receiver: None, | |
// debounce_std_sender: None, | |
// debounce_std_receiver: None, | |
// notify_tokio_sender: None, | |
// notify_tokio_receiver: None, | |
// debounce_tokio_sender: None, | |
// debounce_tokio_receiver: None, | |
// debounce_unbounded_sender: None, | |
// debounce_unbounded_receiver: None | |
// }; | |
// notifier.initialize_notify_watcher_without_timer(); | |
// notifier.notify_watcher_watch_without_timer("D:\\TEMP\\TestNote.txt"); | |
// println!("Try get into main loop"); | |
// // No output in loop | |
// loop { | |
// std::thread::sleep(Duration::from_secs(10)); | |
// println!("Program running"); | |
// } | |
// } | |
// // 3. With standard notify watcher in async, without struct -> success! | |
// #[tokio::main] | |
// async fn main() { | |
// println!("Case 3: With standard notify watcher in async, without struct"); | |
// let (tx, mut rx) = tokio::sync::mpsc::channel(1); | |
// let mut watcher = notify::RecommendedWatcher::new( | |
// move |res| { | |
// let _ = tx.blocking_send(res); | |
// }, | |
// notify::Config::default(), | |
// ).unwrap(); | |
// watcher.watch(Path::new("D:\\TEMP\\TestNote.txt"), RecursiveMode::Recursive).unwrap(); | |
// while let Some(res) = rx.recv().await { | |
// match res { | |
// Ok(event) => println!("changed: {:?}", event), | |
// Err(e) => println!("watch error: {:?}", e), | |
// } | |
// } | |
// } | |
// // 4. With standard notify watcher in async, in struct -> failed(no output for events) | |
// #[tokio::main] | |
// async fn main() { | |
// println!("Case 4: With standard notify watcher in async, in struct"); | |
// let mut notifier: NotifyHandler = NotifyHandler { | |
// notify_watcher: None, | |
// debounce_watcher: None, | |
// std_sender: None, | |
// std_receiver: None, | |
// debounce_std_sender: None, | |
// debounce_std_receiver: None, | |
// notify_tokio_sender: None, | |
// notify_tokio_receiver: None, | |
// debounce_tokio_sender: None, | |
// debounce_tokio_receiver: None, | |
// debounce_unbounded_sender: None, | |
// debounce_unbounded_receiver: None | |
// }; | |
// notifier.initialize_notify_watcher_with_tokio_channel().await; | |
// notifier.notify_watcher_watch_with_tokio_channel("D:\\TEMP\\TestNote.txt").await; | |
// loop { | |
// tokio::time::sleep(Duration::from_secs(5)).await; | |
// println!("Program running"); | |
// } | |
// } | |
// // 5. With debounce wathcer in main with std channel -> success! | |
// fn main() { | |
// println!("Case 5: With debounce wathcer in main with std channel"); | |
// let (tx, rx) = std::sync::mpsc::channel(); | |
// println!("Try initialize notify watcher"); | |
// let mut debouncer = notify_debouncer_full::new_debouncer(Duration::from_secs(3), None, tx).unwrap(); | |
// println!("Initialize notify watcher success!"); | |
// // Add a path to be watched. All files and directories at that path and | |
// // below will be monitored for changes. | |
// println!("Try add watch path to notify watcher"); | |
// debouncer | |
// .watcher() | |
// .watch(Path::new("D:\\TEMP\\TestNote.txt"), RecursiveMode::Recursive).unwrap(); | |
// debouncer | |
// .cache() | |
// .add_root(Path::new("D:\\TEMP\\TestNote.txt"), RecursiveMode::Recursive); | |
// println!("Add watch path to notify watcher success!"); | |
// for result in rx { | |
// match result { | |
// Ok(events) => events.iter().for_each(|event| println!("{event:?}")), | |
// Err(errors) => errors.iter().for_each(|error| println!("{error:?}")), | |
// } | |
// println!(); | |
// } | |
// } | |
// // 6. With debounce wathcer in struct with std channel -> failed![stuck at receiver] | |
// fn main() { | |
// println!("Case 6: With debounce wathcer in struct with std channel"); | |
// let mut notifier: NotifyHandler = NotifyHandler { | |
// notify_watcher: None, | |
// debounce_watcher: None, | |
// std_sender: None, | |
// std_receiver: None, | |
// debounce_std_sender: None, | |
// debounce_std_receiver: None, | |
// notify_tokio_sender: None, | |
// notify_tokio_receiver: None, | |
// debounce_tokio_sender: None, | |
// debounce_tokio_receiver: None, | |
// debounce_unbounded_sender: None, | |
// debounce_unbounded_receiver: None | |
// }; | |
// notifier.initialize_debounce_watcher_with_std_channel(); | |
// notifier.debounce_watcher_watch_with_std_channel("D:\\TEMP\\TestNote.txt"); | |
// loop { | |
// std::thread::sleep(Duration::from_secs(5)); | |
// println!("Program running"); | |
// } | |
// } | |
// // 7. With debounce wathcer in main with tokio channel -> failed![no events send] | |
// #[tokio::main] | |
// async fn main() { | |
// println!("Case 7: With debounce wathcer in main with tokio channel"); | |
// let (tx, mut rx) = tokio::sync::mpsc::channel(1); | |
// println!("Try initialize notify watcher"); | |
// let mut debouncer = notify_debouncer_full::new_debouncer(Duration::from_secs(3), None, move |result: DebounceEventResult| { | |
// let _ = tx.send(result); | |
// }).unwrap(); | |
// println!("Initialize notify watcher success!"); | |
// // Add a path to be watched. All files and directories at that path and | |
// // below will be monitored for changes. | |
// println!("Try add watch path to notify watcher"); | |
// debouncer | |
// .watcher() | |
// .watch(Path::new("D:\\TEMP\\TestNote.txt"), RecursiveMode::Recursive).unwrap(); | |
// debouncer | |
// .cache() | |
// .add_root(Path::new("D:\\TEMP\\TestNote.txt"), RecursiveMode::Recursive); | |
// println!("Add watch path to notify watcher success!"); | |
// while let Some(events) = rx.recv().await { | |
// println!("{events:?}"); | |
// } | |
// } | |
// // 8. With debounce wathcer in struct with tokio channel -> failed![no events send, stuck at while receive] | |
// #[tokio::main] | |
// async fn main() { | |
// println!("Case 8: With debounce wathcer in struct with tokio channel"); | |
// let mut notifier: NotifyHandler = NotifyHandler { | |
// notify_watcher: None, | |
// debounce_watcher: None, | |
// std_sender: None, | |
// std_receiver: None, | |
// debounce_std_sender: None, | |
// debounce_std_receiver: None, | |
// notify_tokio_sender: None, | |
// notify_tokio_receiver: None, | |
// debounce_tokio_sender: None, | |
// debounce_tokio_receiver: None, | |
// debounce_unbounded_sender: None, | |
// debounce_unbounded_receiver: None | |
// }; | |
// notifier.initialize_debounce_watcher_with_tokio_channel().await; | |
// notifier.debounce_watcher_watch_with_tokio_channel("D:\\TEMP\\TestNote.txt").await; | |
// loop { | |
// tokio::time::sleep(Duration::from_secs(5)).await; | |
// println!("Program running"); | |
// } | |
// } | |
// // 9. With debounce wathcer in main with unbounded channel -> success! | |
// #[tokio::main] | |
// async fn main() { | |
// println!("Case 9: With debounce wathcer in main with unbounded channel"); | |
// let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel(); | |
// println!("Try initialize notify watcher"); | |
// let mut debouncer = notify_debouncer_full::new_debouncer(Duration::from_secs(3), None, move |result: DebounceEventResult| { | |
// let _ = tx.send(result); | |
// }).unwrap(); | |
// println!("Initialize notify watcher success!"); | |
// // Add a path to be watched. All files and directories at that path and | |
// // below will be monitored for changes. | |
// println!("Try add watch path to notify watcher"); | |
// debouncer | |
// .watcher() | |
// .watch(Path::new("D:\\TEMP\\TestNote.txt"), RecursiveMode::Recursive).unwrap(); | |
// debouncer | |
// .cache() | |
// .add_root(Path::new("D:\\TEMP\\TestNote.txt"), RecursiveMode::Recursive); | |
// println!("Add watch path to notify watcher success!"); | |
// while let Some(events) = rx.recv().await { | |
// println!("{events:?}"); | |
// } | |
// } | |
// 10. With debounce wathcer in struct with unbounded channel -> failed![no events send] | |
#[tokio::main] | |
async fn main() { | |
println!("Case 10: With debounce wathcer in struct with unbounded channel"); | |
let mut notifier: NotifyHandler = NotifyHandler { | |
notify_watcher: None, | |
debounce_watcher: None, | |
std_sender: None, | |
std_receiver: None, | |
debounce_std_sender: None, | |
debounce_std_receiver: None, | |
notify_tokio_sender: None, | |
notify_tokio_receiver: None, | |
debounce_tokio_sender: None, | |
debounce_tokio_receiver: None, | |
debounce_unbounded_sender: None, | |
debounce_unbounded_receiver: None | |
}; | |
notifier.initialize_debounce_watcher_with_unbounded_channel().await; | |
notifier.debounce_watcher_watch_with_bounded_channel("D:\\TEMP\\TestNote.txt").await; | |
loop { | |
tokio::time::sleep(Duration::from_secs(5)).await; | |
println!("Program running"); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment