Skip to content

Instantly share code, notes, and snippets.

@lithdew
Created May 13, 2023 10:26
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save lithdew/4a6a47ca22eb4468fb8adb4453d87aa4 to your computer and use it in GitHub Desktop.
Save lithdew/4a6a47ca22eb4468fb8adb4453d87aa4 to your computer and use it in GitHub Desktop.
rust: single-threaded futures-aware one-shot broadcast channel
pub mod oneshot_broadcast {
use std::{
cell::UnsafeCell,
pin::Pin,
task::{Context, Poll, Waker},
};
use futures_lite::Future;
use pin_list::PinList;
use pin_project_lite::pin_project;
type PinListTypes = dyn pin_list::Types<
Id = pin_list::id::Checked,
Protected = Waker,
Removed = (),
Unprotected = (),
>;
pub struct Channel<T: Clone> {
data: UnsafeCell<Option<T>>,
waiters: UnsafeCell<PinList<PinListTypes>>,
}
impl<T: Clone> Channel<T> {
pub fn new() -> Self {
Self {
data: UnsafeCell::new(None),
waiters: UnsafeCell::new(PinList::new(pin_list::id::Checked::new())),
}
}
pub fn send(&self, data: T) {
unsafe { *self.data.get() = Some(data) };
let mut cursor = unsafe { (*self.waiters.get()).cursor_front_mut() };
while let Ok(waker) = cursor.remove_current(()) {
waker.wake();
}
}
pub fn recv(&self) -> Recv<'_, T> {
Recv {
chan: self,
node: pin_list::Node::new(),
}
}
}
pin_project! {
pub struct Recv<'chan, T: Clone> {
chan: &'chan Channel<T>,
#[pin]
node: pin_list::Node<PinListTypes>,
}
}
impl<'chan, T: Clone> Future for Recv<'chan, T> {
type Output = T;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut this = self.project();
let waiters = unsafe { &mut *this.chan.waiters.get() };
let data = unsafe { &*this.chan.data.get() };
if let Some(node) = this.node.as_mut().initialized_mut() {
if let Err(node) = node.take_removed(waiters) {
*node.protected_mut(waiters).unwrap() = cx.waker().clone();
return Poll::Pending;
}
}
if let Some(data) = data {
return Poll::Ready(data.clone());
}
waiters.push_back(this.node, cx.waker().clone(), ());
Poll::Pending
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment