Last active
April 2, 2021 09:58
-
-
Save thomcc/8ff3c8529760936bb6e8da753f725936 to your computer and use it in GitHub Desktop.
spsc queue
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#![no_std] | |
extern crate alloc; | |
#[cfg(test)] | |
extern crate std; | |
use alloc::sync::Arc; | |
use core::{ | |
cell::UnsafeCell, | |
mem::MaybeUninit, | |
ptr::{self, NonNull}, | |
sync::atomic::{Ordering::*, *}, | |
}; | |
mod node_alloc; | |
use node_alloc::NodeAlloc; | |
// important requirements to understand: | |
// - SPSC: 1 consumer thread (can call pop), 1 producer thread (can call push). | |
// this is enforced by the wrappers to this type (which are `Send`, `!Sync`, | |
// and `!Clone`). This should be obvious but just mentioning it to be | |
// explicit. | |
// - the queue is never totally empty, there's a dummy node that is always | |
// present. | |
// - Push adds stuff to `tail`, pop takes it from the `head`. | |
// - Producer thread controls the tail. It also controls the `next` pointer on | |
// the most recently added node in the queue. | |
#[repr(C, align(64))] | |
pub struct SpscQueue<T> { | |
head: Align64<AtomicPtr<SpscNode<T>>>, | |
tail: Align64<AtomicPtr<SpscNode<T>>>, | |
node_allocator: NodeAlloc<SpscNode<T>>, | |
} | |
impl<T> Drop for SpscQueue<T> { | |
fn drop(&mut self) { | |
while let Some(_) = self.pop() {} | |
debug_assert!(self.is_empty()); | |
let dummy = self.head.load(Acquire); | |
debug_assert!(!dummy.is_null()); | |
unsafe { | |
self.node_allocator.dealloc(NonNull::new_unchecked(dummy)); | |
self.head.store(core::ptr::null_mut(), Relaxed); | |
self.tail.store(core::ptr::null_mut(), Relaxed); | |
self.node_allocator.destroy(); | |
} | |
} | |
} | |
impl<T> SpscQueue<T> { | |
// FIXME: doesnt' need to return an Arc | |
#[inline] | |
pub fn new() -> Arc<Self> { | |
let this = Arc::new(Self { | |
head: Align64(AtomicPtr::new(0 as *mut _)), | |
tail: Align64(AtomicPtr::new(0 as *mut _)), | |
node_allocator: NodeAlloc::new(), | |
}); | |
// safety: `this` isn't shared yet, so it doesn't matter where `node_allocator` is used from | |
let stub = unsafe { | |
let stub = this.node_allocator.alloc(); | |
core::ptr::addr_of_mut!((*stub.as_ptr()).next).write(AtomicPtr::new(0 as *mut _)); | |
core::ptr::addr_of_mut!((*stub.as_ptr()).data) | |
.write(UnsafeCell::new(MaybeUninit::uninit())); | |
stub.as_ptr() | |
}; | |
this.head.store(stub, Relaxed); | |
this.tail.store(stub, Relaxed); | |
this | |
} | |
#[inline] | |
pub fn push(&self, v: T) { | |
unsafe { | |
let node = self.node_allocator.alloc(); | |
ptr::addr_of_mut!((*node.as_ptr()).next).write(AtomicPtr::new(0 as *mut _)); | |
ptr::addr_of_mut!((*node.as_ptr()).data).write(UnsafeCell::new(MaybeUninit::new(v))); | |
self.push_internal(node.as_ptr()); | |
} | |
} | |
#[inline] | |
pub fn is_empty(&self) -> bool { | |
let head = self.head.load(Acquire); | |
let tail = self.tail.load(Acquire); | |
core::ptr::eq(head, tail) | |
} | |
// node must be completely owned (unique access — just out of the alloc) | |
// and must be initialized with `Some` for `data` and null for `next` | |
#[inline] | |
unsafe fn push_internal(&self, node: *mut SpscNode<T>) { | |
// debug_assert!((*(*node).data.get()).is_some(), "caller should do this"); | |
debug_assert!((*node).next.load(Relaxed).is_null()); | |
// relaxed load is fine because we're the producer and thus we own it | |
let back = self.tail.load(Relaxed); | |
// as soon as i set `back.next` here, `node` is visible to the consumer, | |
// and no longer entirely ours. however, until we add another node, | |
// `(*node).next` is still ours, as it's the last item in the queue. | |
// | |
// release here ensures node.next and node.data are written before | |
// publishing it to other threads. | |
(*back).next.store(node, Release); | |
// now advance tail — again, tail is just for us, so relaxed is fine. | |
self.tail.store(node, Relaxed); | |
} | |
#[inline] | |
fn pop_internal(&self) -> Option<NonNull<SpscNode<T>>> { | |
unsafe { | |
// head is totally owned by us, so we can load relaxed. | |
let front = self.head.load(Relaxed); | |
// front is the current dummy, it is never null, and has no data. | |
debug_assert!(!front.is_null()); | |
// debug_assert!((*(*front).data.get()).is_none()); | |
// sync point with release store in `push` (if front == tail, we're sharing) | |
let next = (*front).next.load(Acquire); | |
if next.is_null() { | |
// empty | |
return None; | |
} | |
// - the node `next` is partially (mostly) owned by us. | |
// partial because producer owns `(*next).next`. | |
// - front will be returned, and is entirely owned by us. | |
debug_assert!(!core::ptr::eq(front, next)); | |
// debug_assert!(!(*(*next).data.get()).is_none()); | |
// copy our data into `front`. | |
core::ptr::swap_nonoverlapping((*next).data.get(), (*front).data.get(), 1); | |
// update our head — we own it. | |
self.head.store(next, Relaxed); | |
// next is now the dummy, return `front`. | |
let result = NonNull::new_unchecked(front); | |
// debug_assert!((*result.data.get()).is_some()); | |
Some(result) | |
} | |
} | |
#[inline] | |
pub fn pop(&self) -> Option<T> { | |
match self.pop_internal() { | |
None => None, | |
Some(node) => unsafe { | |
let ret = ptr::replace( | |
(*ptr::addr_of!((*node.as_ptr()).data)).get(), | |
MaybeUninit::uninit(), | |
) | |
.assume_init(); | |
self.node_allocator.dealloc(node); | |
Some(ret) | |
}, | |
} | |
} | |
} | |
#[repr(C)] | |
struct SpscNode<T> { | |
next: AtomicPtr<SpscNode<T>>, | |
data: UnsafeCell<MaybeUninit<T>>, | |
_pad: Pad, | |
} | |
type Pad = MaybeUninit<[u8; 64]>; | |
#[repr(C, align(64))] | |
struct Align64<T>(T); | |
impl<T> core::ops::Deref for Align64<T> { | |
type Target = T; | |
#[inline] | |
fn deref(&self) -> &T { | |
&self.0 | |
} | |
} | |
#[inline] | |
pub fn spsc_channel<T>() -> (SpscSender<T>, SpscReceiver<T>) { | |
let q = SpscQueue::new(); | |
(SpscSender(q.clone()), SpscReceiver(q)) | |
} | |
pub struct SpscSender<T>(Arc<SpscQueue<T>>); | |
unsafe impl<T: Send> Send for SpscSender<T> {} | |
pub struct SpscReceiver<T>(Arc<SpscQueue<T>>); | |
unsafe impl<T: Send> Send for SpscReceiver<T> {} | |
impl<T: Send> SpscSender<T> { | |
#[inline] | |
pub fn send(&self, v: T) { | |
self.0.push(v); | |
} | |
#[inline] | |
pub fn is_empty(&self) -> bool { | |
self.0.is_empty() | |
} | |
} | |
impl<T: Send> SpscReceiver<T> { | |
#[inline] | |
pub fn recv(&self) -> Option<T> { | |
self.0.pop() | |
} | |
#[inline] | |
pub fn is_empty(&self) -> bool { | |
self.0.is_empty() | |
} | |
} | |
#[cfg(test)] | |
mod tests { | |
use super::*; | |
use alloc::boxed::Box; | |
#[test] | |
fn linear_test() { | |
let (send, recv) = spsc_channel::<Box<i32>>(); | |
assert!(send.is_empty() && recv.is_empty()); | |
send.send(Box::new(1)); | |
assert!(!recv.0.is_empty()); | |
send.send(Box::new(2)); | |
assert!(!recv.0.is_empty()); | |
let v = *recv.recv().unwrap(); | |
assert_eq!(v, 1); | |
assert!(!recv.0.is_empty()); | |
let v = *recv.recv().unwrap(); | |
assert_eq!(v, 2); | |
assert!(recv.0.is_empty()); | |
assert_eq!(recv.recv(), None); | |
send.send(Box::new(3)); | |
assert!(!recv.0.is_empty()); | |
send.send(Box::new(4)); | |
let v = *recv.recv().unwrap(); | |
assert_eq!(v, 3); | |
assert!(!recv.0.is_empty()); | |
send.send(Box::new(5)); | |
let v = *recv.recv().unwrap(); | |
assert_eq!(v, 4); | |
assert!(!recv.0.is_empty()); | |
let v = *recv.recv().unwrap(); | |
assert_eq!(v, 5); | |
assert!(recv.0.is_empty()); | |
assert_eq!(recv.recv(), None); | |
} | |
#[test] | |
fn cobb_test() { | |
use std::cell::Cell; | |
struct RacyQueueState<T> { | |
queue: Arc<SpscQueue<T>>, | |
sends: Cell<usize>, | |
recvs: Cell<usize>, | |
} | |
unsafe impl<T> Send for RacyQueueState<T> {} | |
unsafe impl<T> Sync for RacyQueueState<T> {} | |
const ITERATIONS: usize = if cfg!(miri) { 500 } else { 40000 }; | |
cobb::run_test(cobb::TestCfg { | |
threads: 2, | |
iterations: ITERATIONS, | |
setup: || RacyQueueState::<Box<usize>> { | |
queue: SpscQueue::new(), | |
sends: Cell::new(0), | |
recvs: Cell::new(0), | |
}, | |
test: |state, tctx| { | |
// SANITYCHECK.fetch_add(1, Ordering::Relaxed); | |
if tctx.thread_index() == 0 { | |
for i in 0..4 { | |
state.queue.push(Box::new(100 * i + 10)); | |
state.sends.set(1 + state.sends.get()); | |
} | |
} else { | |
while let Some(v) = state.queue.pop() { | |
state.recvs.set(1 + state.recvs.get()); | |
assert!((*v - 10) % 100 == 0); | |
assert!((0..4).contains(&((*v - 10) / 100))); | |
} | |
} | |
}, | |
before_each: |state| { | |
assert!(state.queue.is_empty()); | |
state.sends.set(0); | |
state.recvs.set(0); | |
}, | |
after_each: |state| { | |
{ | |
while let Some(v) = state.queue.pop() { | |
state.recvs.set(1 + state.recvs.get()); | |
assert!((*v - 10) % 100 == 0); | |
assert!((0..4).contains(&((*v - 10) / 100))); | |
} | |
} | |
assert_eq!(state.sends.get(), state.recvs.get()); | |
state.sends.set(0); | |
state.recvs.set(0); | |
}, | |
..Default::default() | |
}); | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
use core::ptr; | |
use core::{ | |
cell::Cell, | |
ptr::NonNull, | |
sync::atomic::{Ordering::*, *}, | |
}; | |
type Ptr<T> = Option<NonNull<T>>; | |
type CellPtr<T> = Cell<Ptr<T>>; | |
// all fields are owned by producer. only thing a consumer calls is | |
// allocator.dealloc(node), which is mostly to prove the allocator still alive. | |
pub(crate) struct NodeAlloc<T> { | |
head: CellPtr<NodeHeader<T>>, | |
tail: CellPtr<NodeHeader<T>>, | |
count: Cell<usize>, | |
} | |
// node header without the data. | |
type NodeHeader<NodeTy> = NodeHeaderOptData<NodeTy, 0>; | |
// the full node header. used to compute layout for allocating and deallocating, | |
// but all actual use is via `NodeHeader<NodeTy>`. | |
type NodeHeaderWithData<NodeTy> = NodeHeaderOptData<NodeTy, 1>; | |
#[repr(C)] | |
struct NodeHeaderOptData<NodeTy, const N: usize> { | |
// owned by alloc(). never touched by dealloc | |
next: CellPtr<NodeHeaderOptData<NodeTy, 0>>, | |
// read by alloc and written by dealloc — could use LSB of `next` as a | |
// "free" flag, but would slow stuff down. | |
free: AtomicBool, | |
// the payload. not owned by the allocator. | |
_data: [NodeTy; N], | |
} | |
#[inline] | |
unsafe fn hdr_free<'a, T>(n: *mut NodeHeader<T>) -> &'a AtomicBool { | |
&*ptr::addr_of!((*n).free) | |
} | |
#[inline] | |
unsafe fn hdr_next<'a, T>(n: *mut NodeHeader<T>) -> &'a CellPtr<NodeHeader<T>> { | |
&*ptr::addr_of!((*n).next) | |
} | |
impl<T> NodeAlloc<T> { | |
#[inline] | |
pub(crate) const fn new() -> Self { | |
Self { | |
head: Cell::new(None), | |
tail: Cell::new(None), | |
count: Cell::new(0), | |
} | |
} | |
// must only be called from a single producer thread (at a time), which | |
// should, for the most part, "own" the allocator. | |
#[inline] | |
pub(crate) unsafe fn alloc(&self) -> NonNull<T> { | |
// Relaxed is fine here — we own head/tail (and could even use Cell i | |
// think in this version of my algo). | |
let node = match self.tail.get() { | |
// if the tail's free, pop it off the free list. note: acquire | |
// ensures we see anything that happened before it was marked free — | |
// syncs with the release in dealloc | |
Some(tail) if hdr_free(tail.as_ptr()).load(Acquire) => { | |
self.tail.set(hdr_next(tail.as_ptr()).replace(None)); | |
// relaxed is fine because we own it now. | |
hdr_free(tail.as_ptr()).store(false, Relaxed); | |
tail | |
} | |
// no tail or tail not free: allocate fresh | |
_ => { | |
let layout = alloc::alloc::Layout::new::<NodeHeaderWithData<T>>(); | |
let hopefully_ptr = alloc::alloc::alloc(layout).cast::<NodeHeader<T>>(); | |
let node_hdr = NonNull::new(hopefully_ptr).unwrap_or_else(|| { | |
alloc::alloc::handle_alloc_error(layout); | |
}); | |
self.count.set(self.count.get() + 1); | |
ptr::addr_of_mut!((*node_hdr.as_ptr()).next).write(Cell::new(None)); | |
ptr::addr_of_mut!((*node_hdr.as_ptr()).free).write(AtomicBool::new(false)); | |
node_hdr | |
} | |
}; | |
debug_assert!(!hdr_free(node.as_ptr()).load(Relaxed)); | |
debug_assert!(hdr_next(node.as_ptr()).get().is_none()); | |
if let Some(head) = self.head.get() { | |
hdr_next(head.as_ptr()).set(Some(node)); | |
} | |
if self.tail.get().is_none() { | |
self.tail.set(Some(node)); | |
} | |
self.head.set(Some(node)); | |
// offset and return value | |
let ptr = node.as_ptr().add(1).cast::<T>(); | |
debug_assert!(!ptr.is_null()); | |
NonNull::new_unchecked(ptr) | |
} | |
#[inline] | |
pub(crate) unsafe fn dealloc(&self, ptr: NonNull<T>) { | |
let header = ptr.as_ptr().cast::<NodeHeader<T>>().sub(1); | |
// we should not be marked free, since we were just in use. | |
debug_assert!(!hdr_free(header).load(Relaxed)); | |
// Mark as free using release — this ensures no prior stores go past | |
// here, but also syncs with acquire in alloc. | |
hdr_free(header).store(true, Release); | |
// That's all. | |
} | |
#[inline] | |
pub(crate) unsafe fn destroy(&mut self) { | |
let count = self.count.get(); | |
if count == 0 && !cfg!(debug_assertions) { | |
return; | |
} | |
let mut n = 0; | |
let layout = alloc::alloc::Layout::new::<NodeHeaderWithData<T>>(); | |
while let Some(tail) = self.tail.get() { | |
debug_assert!(hdr_free(tail.as_ptr()).load(Relaxed)); | |
self.tail.set(hdr_next(tail.as_ptr()).replace(None)); | |
let ptr = tail.as_ptr().cast::<u8>(); | |
if cfg!(debug_assertions) && !cfg!(miri) { | |
// miri can track UAF without our help | |
ptr.write_bytes(0xfd, layout.size()); | |
} | |
alloc::alloc::dealloc(ptr, layout); | |
n += 1; | |
} | |
debug_assert_eq!(n, count); | |
self.tail.set(None); | |
self.head.set(None); | |
self.count.set(0); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment