Skip to content

Instantly share code, notes, and snippets.

@wspeirs
Created January 14, 2022 15:06
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save wspeirs/3709247dcd1b6660840eeefc2a3effd1 to your computer and use it in GitHub Desktop.
Save wspeirs/3709247dcd1b6660840eeefc2a3effd1 to your computer and use it in GitHub Desktop.
use parking_lot::{RwLock, RwLockReadGuard, RwLockUpgradableReadGuard};
use log::{debug};
use rkyv::{Archive, Archived, Deserialize, out_field, RelPtr, Serialize};
use std::alloc::{alloc, dealloc, Layout};
use std::cmp::max;
use std::mem::ManuallyDrop;
use std::ops::{Deref, DerefMut};
use std::ptr::{copy_nonoverlapping};
use std::sync::atomic::{AtomicUsize, Ordering};
use rkyv::ser::{ScratchSpace, Serializer};
const DEFAULT_SIZE :usize = 128; // default (and min) size of the vector
#[derive(Debug)]
pub struct ConcurrentVec<T> {
ptr: RwLock<(*mut T, usize)>, // pointer and length
len: AtomicUsize
}
unsafe impl <T> Send for ConcurrentVec<T> {}
unsafe impl <T> Sync for ConcurrentVec<T> {}
impl <T> ConcurrentVec<T> {
/// Create a new [ConcurrentVec]
pub fn new() -> Self {
Self::with_capacity(DEFAULT_SIZE)
}
/// Create a new [ConcurrentVec] with the given capacity
pub fn with_capacity(size :usize) -> Self {
// always want to ensure we have at _least_ 1 element
let size = max(1, size);
unsafe {
// make the pointer with capacity
let layout = Layout::array::<T>(size).expect("Error creating layout");
let ptr = alloc(layout) as *mut T;
ConcurrentVec {
ptr: RwLock::new((ptr, size)),
len: AtomicUsize::new(0)
}
}
}
/// Creates a [ConcurrentVec] with a single element
pub fn from_value(value: T) -> Self {
let ret = Self::with_capacity(1);
ret.push(value);
ret
}
/// Creates a new [ConcurrentVec] from a Vec
pub fn from(vec :Vec<T>) -> Self {
let mut vec = ManuallyDrop::new(vec);
let ptr = vec.as_mut_ptr();
let len = vec.len();
let capacity = vec.capacity();
ConcurrentVec {
ptr: RwLock::new((ptr, capacity) ),
len: AtomicUsize::new(len)
}
}
/// Get the current length of the [ConcurrentVec]
#[inline]
pub fn len(&self) -> usize {
self.len.load(Ordering::Relaxed)
}
/// Clears the vector, dropping all the values, and removing all the memory
pub fn clear(&self) {
let mut write_guard = self.ptr.write();
let (ptr, capacity) = write_guard.deref_mut();
let len = self.len();
unsafe {
// drop all the values
std::ptr::drop_in_place(std::ptr::slice_from_raw_parts_mut(*ptr, len));
// release the memory
let layout = Layout::array::<T>(*capacity).expect("Error creating layout");
dealloc(*ptr as *mut u8, layout);
}
// update the capacity and the length
self.len.store(0, Ordering::Relaxed);
*capacity = 0;
}
/// Adds a new value to the end of the vector
/// Returns the index the item was pushed into
pub fn push(&self, value :T) -> usize {
// get the location of where this value will live first
let idx = self.len.fetch_add(1, Ordering::Relaxed);
// now attempt to get a read-protected guard for the pointer & capacity
let guard = self.ptr.upgradable_read();
let (ptr, capacity) = guard.deref();
// debug!("IN PTR: {:p} IDX: {} CAP: {}", *ptr, idx, capacity);
// check to see if we have capacity for this location
if idx >= *capacity {
// need more capacity, so upgrade our lock
let mut guard = RwLockUpgradableReadGuard::upgrade(guard);
let (ptr, capacity) = guard.deref_mut();
// double-check we still need to increase capacity,
// another thread might have already increased capacity
if *capacity <= idx {
// check to see if the capacity is zero, this happens after a clear
if *capacity == 0 {
*capacity = idx + 1; // just enough for our current idx
unsafe {
// allocate new memory right into the ptr
let layout = Layout::array::<T>(*capacity).expect("Error creating layout");
*ptr = alloc(layout) as *mut T;
// set the value
std::ptr::write(ptr.add(idx), value);
}
} else {
let new_capacity = *capacity * 2; // exponential increase
// debug!("ALLOCATING: {} -> {}", *capacity, new_capacity);
unsafe {
// allocate new memory
let layout = Layout::array::<T>(new_capacity).expect("Error creating layout");
let new_ptr = alloc(layout) as *mut T;
// move the items over
copy_nonoverlapping::<T>(*ptr as *const T, new_ptr, *capacity);
// drop the old memory
let old_layout = Layout::array::<T>(*capacity).expect("Error creating layout");
dealloc(*ptr as *mut u8, old_layout);
// update the ptr and capacity
*ptr = new_ptr;
*capacity = new_capacity;
// set the value
std::ptr::write(ptr.add(idx), value);
}
}
} else {
// we have enough capacity, just set the value
unsafe { std::ptr::write(ptr.add(idx), value); }
}
} else {
unsafe { std::ptr::write(ptr.add(idx), value); }
}
idx
}
/// Gets an item in the [ConcurrentVec] and provides it to the function.
pub fn get<F: FnOnce(&T)>(&self, index: usize, f :F) {
let guard = self.ptr.read();
let (ptr, _) = guard.deref();
let len = self.len();
if len < index {
panic!("Attempt to modify out-of-bound index: {} > {}", index, len);
}
// call the function on the item
unsafe { f(&*ptr.add(index)); }
}
/// Consumes the [ConcurrentVec] and returns a Vec
pub fn into_vec(self) -> Vec<T> {
let cv = ManuallyDrop::new(self);
let guard = cv.ptr.read();
let (ptr, capacity) = guard.deref();
let len = cv.len();
unsafe { Vec::from_raw_parts(*ptr, len, *capacity) }
}
/// Gets an iterator over the values in the [ConcurrentVec]
pub fn iter(&self) -> ConcurrentVecIterator<T> {
let guard = self.ptr.read();
let (ptr, _) = guard.deref();
let len = self.len();
ConcurrentVecIterator {
ptr: *ptr as *const T,
idx: 0,
len,
_guard: guard
}
}
}
impl <T> Drop for ConcurrentVec<T> {
fn drop(&mut self) {
let len = self.len();
let (ptr, capacity) = self.ptr.get_mut();
unsafe {
// drop all the values
std::ptr::drop_in_place(std::ptr::slice_from_raw_parts_mut(*ptr, len));
// release the memory
let layout = Layout::array::<T>(*capacity).expect("Error creating layout");
dealloc(*ptr as *mut u8, layout);
}
}
}
pub struct ConcurrentVecIterator<'a, T> {
ptr: *const T,
idx: usize,
len: usize,
_guard: RwLockReadGuard<'a, (*mut T, usize)>
}
impl <'a, T> Iterator for ConcurrentVecIterator<'a, T> {
type Item = &'a T;
fn next(&mut self) -> Option<Self::Item> {
if self.idx >= self.len {
return None;
}
let ret = unsafe { &*self.ptr.add(self.idx) };
self.idx += 1;
Some(ret)
}
}
#[repr(C)]
pub struct ArchivedConcurrentVec<T> {
ptr: RelPtr<T>,
len: Archived<usize>,
}
pub struct ConcurrentVecResolver {
pos: usize,
}
impl <T: Archive> Archive for ConcurrentVec<T> {
type Archived = ArchivedConcurrentVec<T::Archived>;
type Resolver = ConcurrentVecResolver;
#[inline]
unsafe fn resolve(&self, pos: usize, resolver: Self::Resolver, out: *mut Self::Archived) {
let (fp, fo) = out_field!(out.ptr);
RelPtr::emplace(pos + fp, resolver.pos, fo);
let (fp, fo) = out_field!(out.len);
let len = self.len();
usize::resolve(&len, pos + fp, (), fo);
}
}
impl<T: Serialize<S>, S: ScratchSpace + Serializer + ?Sized> Serialize<S> for ConcurrentVec<T> {
#[inline]
fn serialize(&self, serializer: &mut S) -> Result<Self::Resolver, S::Error> {
let gaurd = self.ptr.read();
let (ptr, capacity) = gaurd.deref();
Ok(ConcurrentVecResolver {
pos: (*ptr).serialize_unsized(serializer)?
})
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment