Skip to content

Instantly share code, notes, and snippets.

@Plecra
Created April 7, 2021 11:41
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save Plecra/5598c625a43ecdbdc9c19e3a2b2fda59 to your computer and use it in GitHub Desktop.
Save Plecra/5598c625a43ecdbdc9c19e3a2b2fda59 to your computer and use it in GitHub Desktop.
Rust Concurrent Monotonic list
use core::ptr;
use core::sync::atomic::{fence, AtomicPtr, AtomicUsize, Ordering};
use std::alloc;
#[cfg(not(target_pointer_width = "32"))]
const DIVISIONS: usize = 24;
#[cfg(target_pointer_width = "32")]
const DIVISIONS: usize = 12;
struct MonotonicVec<T> {
filled: AtomicUsize,
claimed: AtomicUsize,
// the length of each division is 8 * (6.pow(i))
divisions: [AtomicPtr<T>; DIVISIONS],
}
impl<T> MonotonicVec<T> {
#[allow(clippy::declare_interior_mutable_const)]
const NULL: AtomicPtr<T> = AtomicPtr::new(ptr::null_mut());
pub const fn new() -> Self {
Self {
filled: AtomicUsize::new(0),
claimed: AtomicUsize::new(0),
divisions: [Self::NULL; DIVISIONS],
}
}
fn division_len(division_n: usize) -> usize {
8 * 6usize.pow(division_n as u32)
}
fn split_index(mut index: usize) -> (usize, usize) {
let mut division = 0;
while let Some(rem) = index.checked_sub(Self::division_len(division)) {
division += 1;
index = rem;
}
(division, index)
}
pub fn push(&self, value: T) -> &T {
let i = self.claimed.fetch_add(1, Ordering::SeqCst);
let (division, index) = Self::split_index(i);
// we are responsible for allocating the division
let el = if index == 0 {
let layout = alloc::Layout::array::<T>(Self::division_len(division)).unwrap();
let ptr = unsafe { alloc::alloc(layout) } as *mut T;
if ptr.is_null() {
alloc::handle_alloc_error(layout);
}
self.divisions[division].store(ptr, Ordering::Release);
ptr
} else {
let division = loop {
let ptr = self.divisions[division].load(Ordering::Relaxed);
if ptr.is_null() {
core::hint::spin_loop()
} else {
fence(Ordering::Acquire);
break ptr;
}
};
unsafe { division.add(index) }
};
let el = unsafe {
ptr::write(el, value);
&*el
};
while self
.filled
.compare_exchange_weak(i, i + 1, Ordering::Release, Ordering::Relaxed)
.is_err()
{
core::hint::spin_loop();
}
el
}
pub fn iter(&self) -> Iter<'_, T> {
Iter {
start: 0,
end: self.filled.load(Ordering::Acquire),
divisions: &self.divisions,
}
}
}
pub struct Iter<'a, T> {
start: usize,
end: usize,
divisions: &'a [AtomicPtr<T>; DIVISIONS],
}
impl<'a, T> Iterator for Iter<'a, T> {
type Item = &'a T;
fn next(&mut self) -> Option<Self::Item> {
if self.start == self.end {
return None;
}
let (division, index) = MonotonicVec::<T>::split_index(self.start);
self.start += 1;
let div = &self.divisions[division];
let div = div as *const AtomicPtr<T> as *const *const T
Some(unsafe {
&*(*(&self.divisions[division] as *const AtomicPtr<T> as *const *const T)).add(index)
})
}
}
impl<'a, T> IntoIterator for &'a MonotonicVec<T> {
type Item = &'a T;
type IntoIter = Iter<'a, T>;
fn into_iter(self) -> Self::IntoIter {
self.iter()
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment