Skip to content

Instantly share code, notes, and snippets.

@ryoqun
Last active May 19, 2024 04:44
Show Gist options
  • Save ryoqun/80adbd49c1658812b47fcf35bacb058f to your computer and use it in GitHub Desktop.
Save ryoqun/80adbd49c1658812b47fcf35bacb058f to your computer and use it in GitHub Desktop.
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering;
use std::sync::Arc;
use std::thread;
use std::time::Duration;
use std::time::Instant;
mod closable_channel {
use crossbeam_channel::{RecvError, SendError};
use std::sync::{Arc, RwLock};
#[derive(Clone)]
pub(super) struct Sender<T> {
sender: crossbeam_channel::Sender<T>,
is_opened: Arc<RwLock<bool>>,
}
#[derive(Clone)]
pub(super) struct Receiver<T> {
receiver: crossbeam_channel::Receiver<T>,
is_opened: Arc<RwLock<bool>>,
}
impl<T> Sender<T> {
pub(super) fn send(&self, t: T) -> Result<(), SendError<T>> {
let is_opened = self.is_opened.read().unwrap();
let res = if *is_opened {
self.sender.send(t)
} else {
Err(SendError(t))
};
drop(is_opened);
res
}
}
impl<T> Receiver<T> {
pub(super) fn recv(&self) -> Result<T, RecvError> {
let is_opened: bool = *self.is_opened.read().unwrap();
if is_opened {
self.receiver.recv()
} else {
self.receiver.try_recv().map_err(|_| RecvError)
}
}
pub(super) fn close(&self) {
*self.is_opened.write().unwrap() = false;
}
}
pub(super) fn unbounded<T>() -> (Sender<T>, Receiver<T>) {
let is_opened = Arc::new(RwLock::new(true));
let (sender, receiver) = crossbeam_channel::unbounded();
(
Sender {
sender,
is_opened: is_opened.clone(),
},
Receiver {
receiver,
is_opened: is_opened.clone(),
},
)
}
}
use crate::closable_channel::unbounded;
fn main() {
let s_count = Arc::new(AtomicUsize::default());
let r_count = Arc::new(AtomicUsize::default());
let (s, r) = unbounded();
thread::scope(|scope| {
for _ in 0..4 {
let s = s.clone();
let s_count = s_count.clone();
scope.spawn(move || {
let started = Instant::now();
loop {
let Ok(()) = s.send(()) else { break };
s_count.fetch_add(1, Ordering::SeqCst);
if started.elapsed() > Duration::from_secs(1) {
thread::sleep(Duration::from_micros(1));
}
}
eprintln!("sender ended!");
});
}
for _ in 0..4 {
let r = r.clone();
let r_count = r_count.clone();
scope.spawn(move || {
let started = Instant::now();
while started.elapsed() < Duration::from_secs(1) {
let Ok(()) = r.recv() else { break };
r_count.fetch_add(1, Ordering::SeqCst);
}
eprintln!("closing...");
r.close();
while let Ok(()) = r.recv() {
r_count.fetch_add(1, Ordering::SeqCst);
}
eprintln!("ended!");
});
}
drop((s, r));
});
dbg!((&s_count, &r_count));
assert_eq!(
Arc::into_inner(s_count).unwrap().into_inner(),
Arc::into_inner(r_count).unwrap().into_inner()
);
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment