Last active
May 19, 2024 04:44
-
-
Save ryoqun/80adbd49c1658812b47fcf35bacb058f to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
use std::sync::atomic::AtomicUsize; | |
use std::sync::atomic::Ordering; | |
use std::sync::Arc; | |
use std::thread; | |
use std::time::Duration; | |
use std::time::Instant; | |
mod closable_channel { | |
use crossbeam_channel::{RecvError, SendError}; | |
use std::sync::{Arc, RwLock}; | |
#[derive(Clone)] | |
pub(super) struct Sender<T> { | |
sender: crossbeam_channel::Sender<T>, | |
is_opened: Arc<RwLock<bool>>, | |
} | |
#[derive(Clone)] | |
pub(super) struct Receiver<T> { | |
receiver: crossbeam_channel::Receiver<T>, | |
is_opened: Arc<RwLock<bool>>, | |
} | |
impl<T> Sender<T> { | |
pub(super) fn send(&self, t: T) -> Result<(), SendError<T>> { | |
let is_opened = self.is_opened.read().unwrap(); | |
let res = if *is_opened { | |
self.sender.send(t) | |
} else { | |
Err(SendError(t)) | |
}; | |
drop(is_opened); | |
res | |
} | |
} | |
impl<T> Receiver<T> { | |
pub(super) fn recv(&self) -> Result<T, RecvError> { | |
let is_opened: bool = *self.is_opened.read().unwrap(); | |
if is_opened { | |
self.receiver.recv() | |
} else { | |
self.receiver.try_recv().map_err(|_| RecvError) | |
} | |
} | |
pub(super) fn close(&self) { | |
*self.is_opened.write().unwrap() = false; | |
} | |
} | |
pub(super) fn unbounded<T>() -> (Sender<T>, Receiver<T>) { | |
let is_opened = Arc::new(RwLock::new(true)); | |
let (sender, receiver) = crossbeam_channel::unbounded(); | |
( | |
Sender { | |
sender, | |
is_opened: is_opened.clone(), | |
}, | |
Receiver { | |
receiver, | |
is_opened: is_opened.clone(), | |
}, | |
) | |
} | |
} | |
use crate::closable_channel::unbounded; | |
fn main() { | |
let s_count = Arc::new(AtomicUsize::default()); | |
let r_count = Arc::new(AtomicUsize::default()); | |
let (s, r) = unbounded(); | |
thread::scope(|scope| { | |
for _ in 0..4 { | |
let s = s.clone(); | |
let s_count = s_count.clone(); | |
scope.spawn(move || { | |
let started = Instant::now(); | |
loop { | |
let Ok(()) = s.send(()) else { break }; | |
s_count.fetch_add(1, Ordering::SeqCst); | |
if started.elapsed() > Duration::from_secs(1) { | |
thread::sleep(Duration::from_micros(1)); | |
} | |
} | |
eprintln!("sender ended!"); | |
}); | |
} | |
for _ in 0..4 { | |
let r = r.clone(); | |
let r_count = r_count.clone(); | |
scope.spawn(move || { | |
let started = Instant::now(); | |
while started.elapsed() < Duration::from_secs(1) { | |
let Ok(()) = r.recv() else { break }; | |
r_count.fetch_add(1, Ordering::SeqCst); | |
} | |
eprintln!("closing..."); | |
r.close(); | |
while let Ok(()) = r.recv() { | |
r_count.fetch_add(1, Ordering::SeqCst); | |
} | |
eprintln!("ended!"); | |
}); | |
} | |
drop((s, r)); | |
}); | |
dbg!((&s_count, &r_count)); | |
assert_eq!( | |
Arc::into_inner(s_count).unwrap().into_inner(), | |
Arc::into_inner(r_count).unwrap().into_inner() | |
); | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment