Skip to content

Instantly share code, notes, and snippets.

@jonhoo
Created August 5, 2020 22:23
Show Gist options
  • Save jonhoo/935060885d0d832d463fda3c89e8259d to your computer and use it in GitHub Desktop.
Save jonhoo/935060885d0d832d463fda3c89e8259d to your computer and use it in GitHub Desktop.
use std::collections::VecDeque;
use std::sync::{Arc, Condvar, Mutex};
// Flavors:
// - Synchronous channels: Channel where send() can block. Limited capacity.
// - Mutex + Condvar + VecDeque
// - Atomic VecDeque (atomic queue) + thread::park + thread::Thread::notify
// - Asynchronous channels: Channel where send() cannot block. Unbounded.
// - Mutex + Condvar + VecDeque
// - Mutex + Condvar + LinkedList
// - Atomic linked list, linked list of T
// - Atomic block linked list, linked list of atomic VecDeque<T>
// - Rendezvous channels: Synchronous with capacity = 0. Used for thread synchronization.
// - Oneshot channels: Any capacity. In practice, only one call to send().
// async/await
pub struct Sender<T> {
shared: Arc<Shared<T>>,
}
impl<T> Clone for Sender<T> {
fn clone(&self) -> Self {
let mut inner = self.shared.inner.lock().unwrap();
inner.senders += 1;
drop(inner);
Sender {
shared: Arc::clone(&self.shared),
}
}
}
impl<T> Drop for Sender<T> {
fn drop(&mut self) {
let mut inner = self.shared.inner.lock().unwrap();
inner.senders -= 1;
let was_last = inner.senders == 0;
drop(inner);
if was_last {
self.shared.available.notify_one();
}
}
}
impl<T> Sender<T> {
pub fn send(&mut self, t: T) {
let mut inner = self.shared.inner.lock().unwrap();
inner.queue.push_back(t);
drop(inner);
self.shared.available.notify_one();
}
}
pub struct Receiver<T> {
shared: Arc<Shared<T>>,
buffer: VecDeque<T>,
}
impl<T> Receiver<T> {
pub fn recv(&mut self) -> Option<T> {
if let Some(t) = self.buffer.pop_front() {
return Some(t);
}
let mut inner = self.shared.inner.lock().unwrap();
loop {
match inner.queue.pop_front() {
Some(t) => {
std::mem::swap(&mut self.buffer, &mut inner.queue);
return Some(t);
}
None if inner.senders == 0 => return None,
None => {
inner = self.shared.available.wait(inner).unwrap();
}
}
}
}
}
impl<T> Iterator for Receiver<T> {
type Item = T;
fn next(&mut self) -> Option<Self::Item> {
self.recv()
}
}
struct Inner<T> {
queue: VecDeque<T>,
senders: usize,
}
struct Shared<T> {
inner: Mutex<Inner<T>>,
available: Condvar,
}
pub fn channel<T>() -> (Sender<T>, Receiver<T>) {
let inner = Inner {
queue: VecDeque::default(),
senders: 1,
};
let shared = Shared {
inner: Mutex::new(inner),
available: Condvar::new(),
};
let shared = Arc::new(shared);
(
Sender {
shared: shared.clone(),
},
Receiver {
shared: shared.clone(),
},
)
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn ping_pong() {
let (mut tx, mut rx) = channel();
tx.send(42);
assert_eq!(rx.recv(), Some(42));
}
#[test]
fn closed_tx() {
let (tx, mut rx) = channel::<()>();
drop(tx);
assert_eq!(rx.recv(), None);
}
#[test]
fn closed_rx() {
let (mut tx, rx) = channel();
drop(rx);
tx.send(42);
}
}
@songzhi
Copy link

songzhi commented Aug 9, 2020

Since using Arc between Sender and Receiver, why not use Arc::strong_count to check if there's Sender alive? It's a mpsc channel, if string_count == 1 checking by Receiver, we know there's only one Receiver alive and no other Sender.

@jonhoo
Copy link
Author

jonhoo commented Aug 9, 2020

I actually already talked through that approach in the video :)

@songzhi
Copy link

songzhi commented Aug 10, 2020

oops...

@jbesraa
Copy link

jbesraa commented Aug 21, 2020

In line 113, why you don't pass buffer to Receiver?

thanks for the videos (:

@jonhoo
Copy link
Author

jonhoo commented Aug 21, 2020

@jbesraa By "buffer" do you mean shared? We could totally do that!

@jbesraa
Copy link

jbesraa commented Aug 21, 2020

mmm
the rust analyzer shows an error for line 113 as the Receiver expects Buffer but does not get it

Screen Shot 2020-08-21 at 18 12 20

@jonhoo
Copy link
Author

jonhoo commented Aug 21, 2020

Oooooh, I see what you mean! Yes, the code is incomplete. The Receiver constructor needs:

  buffer: Default::default(),

@PreetamSing
Copy link

@jonhoo In this implementation of mpsc, what's stopping us from cloning into multiple consumers? As the Receiver also uses Arc, we can potentially write ( after commenting out buffer implementation in Receiver, which is done with an assumption that there would always be a single consumer. Just for simplicity's sake. )

    #[test]
    fn multiple_rx() { // This test passes!
        let (mut tx, rx) = channel();
        let mut handles = Vec::new();
        tx.send(1);
        tx.send(2);
        let mut rx1 = Receiver {
            shared: Arc::clone(&rx.shared),
        };

        handles.push(std::thread::spawn(move || {
            let recv_val = rx1.recv();
            println!("{:#?}", recv_val);
            assert_eq!(Some(1), recv_val);
        }));

        let mut rx2 = Receiver {
            shared: Arc::clone(&rx.shared),
        };

        handles.push(std::thread::spawn(move || {
            std::thread::sleep(std::time::Duration::from_millis(100));
            let recv_val = rx2.recv();
            println!("{:#?}", recv_val);
            assert_eq!(Some(2), recv_val);
        }));

        for handle in handles {
            handle.join().unwrap();
        }
    }

And this test passes! So, there's nothing really preventing us from having multiple consumers. Is that intended or any other explanation?
Thanks for the great content!

@jonhoo
Copy link
Author

jonhoo commented May 14, 2022

Yep, that would work just fine for this implementation!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment