Skip to content

Instantly share code, notes, and snippets.

@Hellager

Hellager/main.rs Secret

Created August 10, 2023 14:04
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save Hellager/6bc77d610ff20932ccec379e20599083 to your computer and use it in GitHub Desktop.
Save Hellager/6bc77d610ff20932ccec379e20599083 to your computer and use it in GitHub Desktop.
Test template for rust notify
/**
* Cargo.toml
[package]
name = "notify_test"
version = "0.1.0"
edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
notify = { version = "6.0.1", default-features = false, features = ["macos_kqueue"] }
notify-debouncer-full = "0.2.0"
tokio = { version = "1", features = ["full"] }
*
*/
use notify::{RecursiveMode, Watcher, ReadDirectoryChangesWatcher, Error};
use std::{path::Path, time::Duration};
use notify_debouncer_full::{Debouncer, FileIdMap, DebounceEventResult, DebouncedEvent};
use tokio;
pub struct NotifyHandler {
pub notify_watcher: Option<ReadDirectoryChangesWatcher>,
pub debounce_watcher: Option<Debouncer<ReadDirectoryChangesWatcher, FileIdMap>>,
pub std_sender: Option<std::sync::mpsc::Sender<Result<Vec<DebouncedEvent>, Vec<Error>>>>,
pub std_receiver: Option<std::sync::mpsc::Receiver<Result<Vec<DebouncedEvent>, Vec<Error>>>>,
pub debounce_std_sender: Option<std::sync::mpsc::Sender<Result<Vec<DebouncedEvent>, Vec<Error>>>>,
pub debounce_std_receiver: Option<std::sync::mpsc::Receiver<Result<Vec<DebouncedEvent>, Vec<Error>>>>,
pub notify_tokio_sender: Option<tokio::sync::mpsc::Sender<Result<notify::Event, notify::Error>>>,
pub notify_tokio_receiver: Option<tokio::sync::mpsc::Receiver<Result<notify::Event, notify::Error>>>,
pub debounce_tokio_sender: Option<tokio::sync::mpsc::Sender<Result<Vec<DebouncedEvent>, Vec<notify::Error>>>>,
pub debounce_tokio_receiver: Option<tokio::sync::mpsc::Receiver<Result<Vec<DebouncedEvent>, Vec<notify::Error>>>>,
pub debounce_unbounded_sender: Option<tokio::sync::mpsc::UnboundedSender<Result<Vec<DebouncedEvent>, Vec<Error>>>>,
pub debounce_unbounded_receiver: Option<tokio::sync::mpsc::UnboundedReceiver<Result<Vec<DebouncedEvent>, Vec<Error>>>>,
}
impl NotifyHandler {
// For [2. With standard notify watcher in struct, without channel]
pub fn initialize_notify_watcher_without_timer(&mut self) {
println!("Try initialize notify watcher without timer");
let watcher = notify::recommended_watcher(|res| {
match res {
Ok(event) => println!("event: {:?}", event),
Err(e) => println!("watch error: {:?}", e),
}
}).unwrap();
self.notify_watcher = Some(watcher);
println!("Initialize notify watcher success!");
}
// For [2. With standard notify watcher in struct, without channel]
pub fn notify_watcher_watch_without_timer(&mut self, path: &str) {
let watch_path = Path::new(path);
if watch_path.exists() {
let is_file = watch_path.is_file();
println!("Valid path {} is file {}", path, is_file);
} else {
println!("watch path {:?} not exists", watch_path);
}
println!("Try add watch path to notify watcher");
if let Some(mut watcher) = self.notify_watcher.take() {
watcher.watch(watch_path, RecursiveMode::Recursive).unwrap();
}
println!("Add watch path to notify watcher success!");
}
// For [4. With standard notify watcher in async, in struct]
pub async fn initialize_notify_watcher_with_tokio_channel(&mut self) {
let (tx, rx) = tokio::sync::mpsc::channel(1);
println!("Try initialize notify watcher without timer");
let watcher_tx = tx.clone();
let watcher = notify::recommended_watcher(move |res| {
println!("Try send events");
let _ = watcher_tx.blocking_send(res);
}).unwrap();
self.notify_watcher = Some(watcher);
self.notify_tokio_sender = Some(tx);
self.notify_tokio_receiver = Some(rx);
println!("Initialize notify watcher success!");
}
// For [4. With standard notify watcher in async, in struct]
pub async fn notify_watcher_watch_with_tokio_channel(&mut self, path: &str) {
let watch_path = Path::new(path);
if watch_path.exists() {
let is_file = watch_path.is_file();
println!("Valid path {} is file {}", path, is_file);
} else {
println!("watch path {:?} not exists", watch_path);
}
println!("Try add watch path to notify watcher");
if let Some(mut watcher) = self.notify_watcher.take() {
watcher.watch(watch_path, RecursiveMode::Recursive).unwrap();
}
println!("Add watch path to notify watcher success!");
}
// For [6. With debounce wathcer in struct with std channel]
pub fn initialize_debounce_watcher_with_std_channel(&mut self) {
let (tx, rx) = std::sync::mpsc::channel();
println!("Try initialize notify watcher without timer");
let watcher_tx = tx.clone();
let watcher = notify_debouncer_full::new_debouncer(Duration::from_secs(3), None, watcher_tx).unwrap();
self.debounce_watcher = Some(watcher);
self.debounce_std_sender = Some(tx);
self.debounce_std_receiver = Some(rx);
println!("Initialize notify watcher success!");
}
// For [6. With debounce wathcer in struct with std channel]
pub fn debounce_watcher_watch_with_std_channel(&mut self, path: &str) {
let watch_path = Path::new(path);
if watch_path.exists() {
let is_file = watch_path.is_file();
println!("Valid path {} is file {}", path, is_file);
} else {
println!("watch path {:?} not exists", watch_path);
}
println!("Try add watch path to notify watcher");
if let Some(mut debouncer) = self.debounce_watcher.take() {
debouncer
.watcher()
.watch(Path::new("D:\\TEMP\\TestNote.txt"), RecursiveMode::Recursive).unwrap();
debouncer
.cache()
.add_root(Path::new("D:\\TEMP\\TestNote.txt"), RecursiveMode::Recursive);
}
if let Some(rx) = self.debounce_std_receiver.take() {
for result in rx {
match result {
Ok(events) => events.iter().for_each(|event| println!("{event:?}")),
Err(errors) => errors.iter().for_each(|error| println!("{error:?}")),
}
println!();
}
}
println!("Add watch path to notify watcher success!");
}
// For [8. With debounce wathcer in struct with tokio channel]
pub async fn initialize_debounce_watcher_with_tokio_channel(&mut self) {
let (tx, rx) = tokio::sync::mpsc::channel(1);
println!("Try initialize notify watcher without timer");
let watcher_tx = tx.clone();
let watcher = notify_debouncer_full::new_debouncer(Duration::from_secs(3), None, move |result: DebounceEventResult| {
println!("Detect events");
let _ = watcher_tx.send(result);
}).unwrap();
self.debounce_watcher = Some(watcher);
self.debounce_tokio_sender = Some(tx);
self.debounce_tokio_receiver = Some(rx);
println!("Initialize notify watcher success!");
}
// For [8. With debounce wathcer in struct with tokio channel]
pub async fn debounce_watcher_watch_with_tokio_channel(&mut self, path: &str) {
let watch_path = Path::new(path);
if watch_path.exists() {
let is_file = watch_path.is_file();
println!("Valid path {} is file {}", path, is_file);
} else {
println!("watch path {:?} not exists", watch_path);
}
println!("Try add watch path to notify watcher");
if let Some(mut debouncer) = self.debounce_watcher.take() {
debouncer
.watcher()
.watch(Path::new("D:\\TEMP\\TestNote.txt"), RecursiveMode::Recursive).unwrap();
debouncer
.cache()
.add_root(Path::new("D:\\TEMP\\TestNote.txt"), RecursiveMode::Recursive);
}
println!("Add watch path to notify watcher success!");
println!("Try receive from sender");
if let Some(mut rx) = self.debounce_tokio_receiver.take() {
while let Some(res) = rx.recv().await {
match res {
Ok(events) => {
println!("events: {:?}", events);
},
Err(errors) => {
println!("errors: {:?}", errors)
}
}
}
}
}
// For [10. With debounce wathcer in struct with unbounded channel]
pub async fn initialize_debounce_watcher_with_unbounded_channel(&mut self) {
let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
println!("Try initialize notify watcher without timer");
let watcher_tx = tx.clone();
let watcher = notify_debouncer_full::new_debouncer(Duration::from_secs(3), None, move |result: DebounceEventResult| {
println!("Detect events");
let _ = watcher_tx.send(result);
}).unwrap();
self.debounce_watcher = Some(watcher);
self.debounce_unbounded_sender = Some(tx);
self.debounce_unbounded_receiver = Some(rx);
println!("Initialize notify watcher success!");
}
// For [10. With debounce wathcer in struct with unbounded channel]
pub async fn debounce_watcher_watch_with_bounded_channel(&mut self, path: &str) {
let watch_path = Path::new(path);
if watch_path.exists() {
let is_file = watch_path.is_file();
println!("Valid path {} is file {}", path, is_file);
} else {
println!("watch path {:?} not exists", watch_path);
}
println!("Try add watch path to notify watcher");
if let Some(mut debouncer) = self.debounce_watcher.take() {
debouncer
.watcher()
.watch(Path::new("D:\\TEMP\\TestNote.txt"), RecursiveMode::Recursive).unwrap();
debouncer
.cache()
.add_root(Path::new("D:\\TEMP\\TestNote.txt"), RecursiveMode::Recursive);
}
if let Some(mut rx) = self.debounce_unbounded_receiver.take() {
tokio::spawn(async move {
while let Some(res) = rx.recv().await {
match res {
Ok(events) => {
println!("events: {:?}", events);
},
Err(errors) => {
println!("errors: {:?}", errors)
}
}
}
});
}
println!("Add watch path to notify watcher success!");
}
}
// // 1. With standard notify watcher in main -> half failed(No output in main loop, had output for events)
// fn main() {
// println!("Case 1: With standard notify watcher in main");
// // Automatically select the best implementation for your platform.
// println!("Try initialize notify watcher");
// let mut watcher = notify::recommended_watcher(|res| {
// match res {
// Ok(event) => println!("event: {:?}", event),
// Err(e) => println!("watch error: {:?}", e),
// }
// }).unwrap();
// println!("Initialize notify watcher success!");
// // Add a path to be watched. All files and directories at that path and
// // below will be monitored for changes.
// println!("Try add watch path to notify watcher");
// watcher.watch(Path::new("D:\\TEMP\\TestNote.txt"), RecursiveMode::Recursive).unwrap();
// println!("Add watch path to notify watcher success!");
// // No output in main loop, had output for events
// loop {
// std::thread::sleep(Duration::from_secs(10));
// debug!("Program running");
// }
// }
// // 2. With standard notify watcher in struct, without channel -> failed![no events send]
// fn main() {
// println!("Case 2: With standard notify watcher in struct, without channel");
// let mut notifier: NotifyHandler = NotifyHandler {
// notify_watcher: None,
// debounce_watcher: None,
// std_sender: None,
// std_receiver: None,
// debounce_std_sender: None,
// debounce_std_receiver: None,
// notify_tokio_sender: None,
// notify_tokio_receiver: None,
// debounce_tokio_sender: None,
// debounce_tokio_receiver: None,
// debounce_unbounded_sender: None,
// debounce_unbounded_receiver: None
// };
// notifier.initialize_notify_watcher_without_timer();
// notifier.notify_watcher_watch_without_timer("D:\\TEMP\\TestNote.txt");
// println!("Try get into main loop");
// // No output in loop
// loop {
// std::thread::sleep(Duration::from_secs(10));
// println!("Program running");
// }
// }
// // 3. With standard notify watcher in async, without struct -> success!
// #[tokio::main]
// async fn main() {
// println!("Case 3: With standard notify watcher in async, without struct");
// let (tx, mut rx) = tokio::sync::mpsc::channel(1);
// let mut watcher = notify::RecommendedWatcher::new(
// move |res| {
// let _ = tx.blocking_send(res);
// },
// notify::Config::default(),
// ).unwrap();
// watcher.watch(Path::new("D:\\TEMP\\TestNote.txt"), RecursiveMode::Recursive).unwrap();
// while let Some(res) = rx.recv().await {
// match res {
// Ok(event) => println!("changed: {:?}", event),
// Err(e) => println!("watch error: {:?}", e),
// }
// }
// }
// // 4. With standard notify watcher in async, in struct -> failed(no output for events)
// #[tokio::main]
// async fn main() {
// println!("Case 4: With standard notify watcher in async, in struct");
// let mut notifier: NotifyHandler = NotifyHandler {
// notify_watcher: None,
// debounce_watcher: None,
// std_sender: None,
// std_receiver: None,
// debounce_std_sender: None,
// debounce_std_receiver: None,
// notify_tokio_sender: None,
// notify_tokio_receiver: None,
// debounce_tokio_sender: None,
// debounce_tokio_receiver: None,
// debounce_unbounded_sender: None,
// debounce_unbounded_receiver: None
// };
// notifier.initialize_notify_watcher_with_tokio_channel().await;
// notifier.notify_watcher_watch_with_tokio_channel("D:\\TEMP\\TestNote.txt").await;
// loop {
// tokio::time::sleep(Duration::from_secs(5)).await;
// println!("Program running");
// }
// }
// // 5. With debounce wathcer in main with std channel -> success!
// fn main() {
// println!("Case 5: With debounce wathcer in main with std channel");
// let (tx, rx) = std::sync::mpsc::channel();
// println!("Try initialize notify watcher");
// let mut debouncer = notify_debouncer_full::new_debouncer(Duration::from_secs(3), None, tx).unwrap();
// println!("Initialize notify watcher success!");
// // Add a path to be watched. All files and directories at that path and
// // below will be monitored for changes.
// println!("Try add watch path to notify watcher");
// debouncer
// .watcher()
// .watch(Path::new("D:\\TEMP\\TestNote.txt"), RecursiveMode::Recursive).unwrap();
// debouncer
// .cache()
// .add_root(Path::new("D:\\TEMP\\TestNote.txt"), RecursiveMode::Recursive);
// println!("Add watch path to notify watcher success!");
// for result in rx {
// match result {
// Ok(events) => events.iter().for_each(|event| println!("{event:?}")),
// Err(errors) => errors.iter().for_each(|error| println!("{error:?}")),
// }
// println!();
// }
// }
// // 6. With debounce wathcer in struct with std channel -> failed![stuck at receiver]
// fn main() {
// println!("Case 6: With debounce wathcer in struct with std channel");
// let mut notifier: NotifyHandler = NotifyHandler {
// notify_watcher: None,
// debounce_watcher: None,
// std_sender: None,
// std_receiver: None,
// debounce_std_sender: None,
// debounce_std_receiver: None,
// notify_tokio_sender: None,
// notify_tokio_receiver: None,
// debounce_tokio_sender: None,
// debounce_tokio_receiver: None,
// debounce_unbounded_sender: None,
// debounce_unbounded_receiver: None
// };
// notifier.initialize_debounce_watcher_with_std_channel();
// notifier.debounce_watcher_watch_with_std_channel("D:\\TEMP\\TestNote.txt");
// loop {
// std::thread::sleep(Duration::from_secs(5));
// println!("Program running");
// }
// }
// // 7. With debounce wathcer in main with tokio channel -> failed![no events send]
// #[tokio::main]
// async fn main() {
// println!("Case 7: With debounce wathcer in main with tokio channel");
// let (tx, mut rx) = tokio::sync::mpsc::channel(1);
// println!("Try initialize notify watcher");
// let mut debouncer = notify_debouncer_full::new_debouncer(Duration::from_secs(3), None, move |result: DebounceEventResult| {
// let _ = tx.send(result);
// }).unwrap();
// println!("Initialize notify watcher success!");
// // Add a path to be watched. All files and directories at that path and
// // below will be monitored for changes.
// println!("Try add watch path to notify watcher");
// debouncer
// .watcher()
// .watch(Path::new("D:\\TEMP\\TestNote.txt"), RecursiveMode::Recursive).unwrap();
// debouncer
// .cache()
// .add_root(Path::new("D:\\TEMP\\TestNote.txt"), RecursiveMode::Recursive);
// println!("Add watch path to notify watcher success!");
// while let Some(events) = rx.recv().await {
// println!("{events:?}");
// }
// }
// // 8. With debounce wathcer in struct with tokio channel -> failed![no events send, stuck at while receive]
// #[tokio::main]
// async fn main() {
// println!("Case 8: With debounce wathcer in struct with tokio channel");
// let mut notifier: NotifyHandler = NotifyHandler {
// notify_watcher: None,
// debounce_watcher: None,
// std_sender: None,
// std_receiver: None,
// debounce_std_sender: None,
// debounce_std_receiver: None,
// notify_tokio_sender: None,
// notify_tokio_receiver: None,
// debounce_tokio_sender: None,
// debounce_tokio_receiver: None,
// debounce_unbounded_sender: None,
// debounce_unbounded_receiver: None
// };
// notifier.initialize_debounce_watcher_with_tokio_channel().await;
// notifier.debounce_watcher_watch_with_tokio_channel("D:\\TEMP\\TestNote.txt").await;
// loop {
// tokio::time::sleep(Duration::from_secs(5)).await;
// println!("Program running");
// }
// }
// // 9. With debounce wathcer in main with unbounded channel -> success!
// #[tokio::main]
// async fn main() {
// println!("Case 9: With debounce wathcer in main with unbounded channel");
// let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
// println!("Try initialize notify watcher");
// let mut debouncer = notify_debouncer_full::new_debouncer(Duration::from_secs(3), None, move |result: DebounceEventResult| {
// let _ = tx.send(result);
// }).unwrap();
// println!("Initialize notify watcher success!");
// // Add a path to be watched. All files and directories at that path and
// // below will be monitored for changes.
// println!("Try add watch path to notify watcher");
// debouncer
// .watcher()
// .watch(Path::new("D:\\TEMP\\TestNote.txt"), RecursiveMode::Recursive).unwrap();
// debouncer
// .cache()
// .add_root(Path::new("D:\\TEMP\\TestNote.txt"), RecursiveMode::Recursive);
// println!("Add watch path to notify watcher success!");
// while let Some(events) = rx.recv().await {
// println!("{events:?}");
// }
// }
// 10. With debounce wathcer in struct with unbounded channel -> failed![no events send]
#[tokio::main]
async fn main() {
println!("Case 10: With debounce wathcer in struct with unbounded channel");
let mut notifier: NotifyHandler = NotifyHandler {
notify_watcher: None,
debounce_watcher: None,
std_sender: None,
std_receiver: None,
debounce_std_sender: None,
debounce_std_receiver: None,
notify_tokio_sender: None,
notify_tokio_receiver: None,
debounce_tokio_sender: None,
debounce_tokio_receiver: None,
debounce_unbounded_sender: None,
debounce_unbounded_receiver: None
};
notifier.initialize_debounce_watcher_with_unbounded_channel().await;
notifier.debounce_watcher_watch_with_bounded_channel("D:\\TEMP\\TestNote.txt").await;
loop {
tokio::time::sleep(Duration::from_secs(5)).await;
println!("Program running");
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment