Skip to content

Instantly share code, notes, and snippets.

@thomcc
Last active Apr 2, 2021
Embed
What would you like to do?
spsc queue
#![no_std]
extern crate alloc;
#[cfg(test)]
extern crate std;
use alloc::sync::Arc;
use core::{
cell::UnsafeCell,
mem::MaybeUninit,
ptr::{self, NonNull},
sync::atomic::{Ordering::*, *},
};
mod node_alloc;
use node_alloc::NodeAlloc;
// important requirements to understand:
// - SPSC: 1 consumer thread (can call pop), 1 producer thread (can call push).
// this is enforced by the wrappers to this type (which are `Send`, `!Sync`,
// and `!Clone`). This should be obvious but just mentioning it to be
// explicit.
// - the queue is never totally empty, there's a dummy node that is always
// present.
// - Push adds stuff to `tail`, pop takes it from the `head`.
// - Producer thread controls the tail. It also controls the `next` pointer on
// the most recently added node in the queue.
#[repr(C, align(64))]
pub struct SpscQueue<T> {
head: Align64<AtomicPtr<SpscNode<T>>>,
tail: Align64<AtomicPtr<SpscNode<T>>>,
node_allocator: NodeAlloc<SpscNode<T>>,
}
impl<T> Drop for SpscQueue<T> {
fn drop(&mut self) {
while let Some(_) = self.pop() {}
debug_assert!(self.is_empty());
let dummy = self.head.load(Acquire);
debug_assert!(!dummy.is_null());
unsafe {
self.node_allocator.dealloc(NonNull::new_unchecked(dummy));
self.head.store(core::ptr::null_mut(), Relaxed);
self.tail.store(core::ptr::null_mut(), Relaxed);
self.node_allocator.destroy();
}
}
}
impl<T> SpscQueue<T> {
// FIXME: doesnt' need to return an Arc
#[inline]
pub fn new() -> Arc<Self> {
let this = Arc::new(Self {
head: Align64(AtomicPtr::new(0 as *mut _)),
tail: Align64(AtomicPtr::new(0 as *mut _)),
node_allocator: NodeAlloc::new(),
});
// safety: `this` isn't shared yet, so it doesn't matter where `node_allocator` is used from
let stub = unsafe {
let stub = this.node_allocator.alloc();
core::ptr::addr_of_mut!((*stub.as_ptr()).next).write(AtomicPtr::new(0 as *mut _));
core::ptr::addr_of_mut!((*stub.as_ptr()).data)
.write(UnsafeCell::new(MaybeUninit::uninit()));
stub.as_ptr()
};
this.head.store(stub, Relaxed);
this.tail.store(stub, Relaxed);
this
}
#[inline]
pub fn push(&self, v: T) {
unsafe {
let node = self.node_allocator.alloc();
ptr::addr_of_mut!((*node.as_ptr()).next).write(AtomicPtr::new(0 as *mut _));
ptr::addr_of_mut!((*node.as_ptr()).data).write(UnsafeCell::new(MaybeUninit::new(v)));
self.push_internal(node.as_ptr());
}
}
#[inline]
pub fn is_empty(&self) -> bool {
let head = self.head.load(Acquire);
let tail = self.tail.load(Acquire);
core::ptr::eq(head, tail)
}
// node must be completely owned (unique access — just out of the alloc)
// and must be initialized with `Some` for `data` and null for `next`
#[inline]
unsafe fn push_internal(&self, node: *mut SpscNode<T>) {
// debug_assert!((*(*node).data.get()).is_some(), "caller should do this");
debug_assert!((*node).next.load(Relaxed).is_null());
// relaxed load is fine because we're the producer and thus we own it
let back = self.tail.load(Relaxed);
// as soon as i set `back.next` here, `node` is visible to the consumer,
// and no longer entirely ours. however, until we add another node,
// `(*node).next` is still ours, as it's the last item in the queue.
//
// release here ensures node.next and node.data are written before
// publishing it to other threads.
(*back).next.store(node, Release);
// now advance tail — again, tail is just for us, so relaxed is fine.
self.tail.store(node, Relaxed);
}
#[inline]
fn pop_internal(&self) -> Option<NonNull<SpscNode<T>>> {
unsafe {
// head is totally owned by us, so we can load relaxed.
let front = self.head.load(Relaxed);
// front is the current dummy, it is never null, and has no data.
debug_assert!(!front.is_null());
// debug_assert!((*(*front).data.get()).is_none());
// sync point with release store in `push` (if front == tail, we're sharing)
let next = (*front).next.load(Acquire);
if next.is_null() {
// empty
return None;
}
// - the node `next` is partially (mostly) owned by us.
// partial because producer owns `(*next).next`.
// - front will be returned, and is entirely owned by us.
debug_assert!(!core::ptr::eq(front, next));
// debug_assert!(!(*(*next).data.get()).is_none());
// copy our data into `front`.
core::ptr::swap_nonoverlapping((*next).data.get(), (*front).data.get(), 1);
// update our head — we own it.
self.head.store(next, Relaxed);
// next is now the dummy, return `front`.
let result = NonNull::new_unchecked(front);
// debug_assert!((*result.data.get()).is_some());
Some(result)
}
}
#[inline]
pub fn pop(&self) -> Option<T> {
match self.pop_internal() {
None => None,
Some(node) => unsafe {
let ret = ptr::replace(
(*ptr::addr_of!((*node.as_ptr()).data)).get(),
MaybeUninit::uninit(),
)
.assume_init();
self.node_allocator.dealloc(node);
Some(ret)
},
}
}
}
#[repr(C)]
struct SpscNode<T> {
next: AtomicPtr<SpscNode<T>>,
data: UnsafeCell<MaybeUninit<T>>,
_pad: Pad,
}
type Pad = MaybeUninit<[u8; 64]>;
#[repr(C, align(64))]
struct Align64<T>(T);
impl<T> core::ops::Deref for Align64<T> {
type Target = T;
#[inline]
fn deref(&self) -> &T {
&self.0
}
}
#[inline]
pub fn spsc_channel<T>() -> (SpscSender<T>, SpscReceiver<T>) {
let q = SpscQueue::new();
(SpscSender(q.clone()), SpscReceiver(q))
}
pub struct SpscSender<T>(Arc<SpscQueue<T>>);
unsafe impl<T: Send> Send for SpscSender<T> {}
pub struct SpscReceiver<T>(Arc<SpscQueue<T>>);
unsafe impl<T: Send> Send for SpscReceiver<T> {}
impl<T: Send> SpscSender<T> {
#[inline]
pub fn send(&self, v: T) {
self.0.push(v);
}
#[inline]
pub fn is_empty(&self) -> bool {
self.0.is_empty()
}
}
impl<T: Send> SpscReceiver<T> {
#[inline]
pub fn recv(&self) -> Option<T> {
self.0.pop()
}
#[inline]
pub fn is_empty(&self) -> bool {
self.0.is_empty()
}
}
#[cfg(test)]
mod tests {
use super::*;
use alloc::boxed::Box;
#[test]
fn linear_test() {
let (send, recv) = spsc_channel::<Box<i32>>();
assert!(send.is_empty() && recv.is_empty());
send.send(Box::new(1));
assert!(!recv.0.is_empty());
send.send(Box::new(2));
assert!(!recv.0.is_empty());
let v = *recv.recv().unwrap();
assert_eq!(v, 1);
assert!(!recv.0.is_empty());
let v = *recv.recv().unwrap();
assert_eq!(v, 2);
assert!(recv.0.is_empty());
assert_eq!(recv.recv(), None);
send.send(Box::new(3));
assert!(!recv.0.is_empty());
send.send(Box::new(4));
let v = *recv.recv().unwrap();
assert_eq!(v, 3);
assert!(!recv.0.is_empty());
send.send(Box::new(5));
let v = *recv.recv().unwrap();
assert_eq!(v, 4);
assert!(!recv.0.is_empty());
let v = *recv.recv().unwrap();
assert_eq!(v, 5);
assert!(recv.0.is_empty());
assert_eq!(recv.recv(), None);
}
#[test]
fn cobb_test() {
use std::cell::Cell;
struct RacyQueueState<T> {
queue: Arc<SpscQueue<T>>,
sends: Cell<usize>,
recvs: Cell<usize>,
}
unsafe impl<T> Send for RacyQueueState<T> {}
unsafe impl<T> Sync for RacyQueueState<T> {}
const ITERATIONS: usize = if cfg!(miri) { 500 } else { 40000 };
cobb::run_test(cobb::TestCfg {
threads: 2,
iterations: ITERATIONS,
setup: || RacyQueueState::<Box<usize>> {
queue: SpscQueue::new(),
sends: Cell::new(0),
recvs: Cell::new(0),
},
test: |state, tctx| {
// SANITYCHECK.fetch_add(1, Ordering::Relaxed);
if tctx.thread_index() == 0 {
for i in 0..4 {
state.queue.push(Box::new(100 * i + 10));
state.sends.set(1 + state.sends.get());
}
} else {
while let Some(v) = state.queue.pop() {
state.recvs.set(1 + state.recvs.get());
assert!((*v - 10) % 100 == 0);
assert!((0..4).contains(&((*v - 10) / 100)));
}
}
},
before_each: |state| {
assert!(state.queue.is_empty());
state.sends.set(0);
state.recvs.set(0);
},
after_each: |state| {
{
while let Some(v) = state.queue.pop() {
state.recvs.set(1 + state.recvs.get());
assert!((*v - 10) % 100 == 0);
assert!((0..4).contains(&((*v - 10) / 100)));
}
}
assert_eq!(state.sends.get(), state.recvs.get());
state.sends.set(0);
state.recvs.set(0);
},
..Default::default()
});
}
}
use core::ptr;
use core::{
cell::Cell,
ptr::NonNull,
sync::atomic::{Ordering::*, *},
};
type Ptr<T> = Option<NonNull<T>>;
type CellPtr<T> = Cell<Ptr<T>>;
// all fields are owned by producer. only thing a consumer calls is
// allocator.dealloc(node), which is mostly to prove the allocator still alive.
pub(crate) struct NodeAlloc<T> {
head: CellPtr<NodeHeader<T>>,
tail: CellPtr<NodeHeader<T>>,
count: Cell<usize>,
}
// node header without the data.
type NodeHeader<NodeTy> = NodeHeaderOptData<NodeTy, 0>;
// the full node header. used to compute layout for allocating and deallocating,
// but all actual use is via `NodeHeader<NodeTy>`.
type NodeHeaderWithData<NodeTy> = NodeHeaderOptData<NodeTy, 1>;
#[repr(C)]
struct NodeHeaderOptData<NodeTy, const N: usize> {
// owned by alloc(). never touched by dealloc
next: CellPtr<NodeHeaderOptData<NodeTy, 0>>,
// read by alloc and written by dealloc — could use LSB of `next` as a
// "free" flag, but would slow stuff down.
free: AtomicBool,
// the payload. not owned by the allocator.
_data: [NodeTy; N],
}
#[inline]
unsafe fn hdr_free<'a, T>(n: *mut NodeHeader<T>) -> &'a AtomicBool {
&*ptr::addr_of!((*n).free)
}
#[inline]
unsafe fn hdr_next<'a, T>(n: *mut NodeHeader<T>) -> &'a CellPtr<NodeHeader<T>> {
&*ptr::addr_of!((*n).next)
}
impl<T> NodeAlloc<T> {
#[inline]
pub(crate) const fn new() -> Self {
Self {
head: Cell::new(None),
tail: Cell::new(None),
count: Cell::new(0),
}
}
// must only be called from a single producer thread (at a time), which
// should, for the most part, "own" the allocator.
#[inline]
pub(crate) unsafe fn alloc(&self) -> NonNull<T> {
// Relaxed is fine here — we own head/tail (and could even use Cell i
// think in this version of my algo).
let node = match self.tail.get() {
// if the tail's free, pop it off the free list. note: acquire
// ensures we see anything that happened before it was marked free —
// syncs with the release in dealloc
Some(tail) if hdr_free(tail.as_ptr()).load(Acquire) => {
self.tail.set(hdr_next(tail.as_ptr()).replace(None));
// relaxed is fine because we own it now.
hdr_free(tail.as_ptr()).store(false, Relaxed);
tail
}
// no tail or tail not free: allocate fresh
_ => {
let layout = alloc::alloc::Layout::new::<NodeHeaderWithData<T>>();
let hopefully_ptr = alloc::alloc::alloc(layout).cast::<NodeHeader<T>>();
let node_hdr = NonNull::new(hopefully_ptr).unwrap_or_else(|| {
alloc::alloc::handle_alloc_error(layout);
});
self.count.set(self.count.get() + 1);
ptr::addr_of_mut!((*node_hdr.as_ptr()).next).write(Cell::new(None));
ptr::addr_of_mut!((*node_hdr.as_ptr()).free).write(AtomicBool::new(false));
node_hdr
}
};
debug_assert!(!hdr_free(node.as_ptr()).load(Relaxed));
debug_assert!(hdr_next(node.as_ptr()).get().is_none());
if let Some(head) = self.head.get() {
hdr_next(head.as_ptr()).set(Some(node));
}
if self.tail.get().is_none() {
self.tail.set(Some(node));
}
self.head.set(Some(node));
// offset and return value
let ptr = node.as_ptr().add(1).cast::<T>();
debug_assert!(!ptr.is_null());
NonNull::new_unchecked(ptr)
}
#[inline]
pub(crate) unsafe fn dealloc(&self, ptr: NonNull<T>) {
let header = ptr.as_ptr().cast::<NodeHeader<T>>().sub(1);
// we should not be marked free, since we were just in use.
debug_assert!(!hdr_free(header).load(Relaxed));
// Mark as free using release — this ensures no prior stores go past
// here, but also syncs with acquire in alloc.
hdr_free(header).store(true, Release);
// That's all.
}
#[inline]
pub(crate) unsafe fn destroy(&mut self) {
let count = self.count.get();
if count == 0 && !cfg!(debug_assertions) {
return;
}
let mut n = 0;
let layout = alloc::alloc::Layout::new::<NodeHeaderWithData<T>>();
while let Some(tail) = self.tail.get() {
debug_assert!(hdr_free(tail.as_ptr()).load(Relaxed));
self.tail.set(hdr_next(tail.as_ptr()).replace(None));
let ptr = tail.as_ptr().cast::<u8>();
if cfg!(debug_assertions) && !cfg!(miri) {
// miri can track UAF without our help
ptr.write_bytes(0xfd, layout.size());
}
alloc::alloc::dealloc(ptr, layout);
n += 1;
}
debug_assert_eq!(n, count);
self.tail.set(None);
self.head.set(None);
self.count.set(0);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment