Created
January 14, 2022 15:06
-
-
Save wspeirs/3709247dcd1b6660840eeefc2a3effd1 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
use parking_lot::{RwLock, RwLockReadGuard, RwLockUpgradableReadGuard}; | |
use log::{debug}; | |
use rkyv::{Archive, Archived, Deserialize, out_field, RelPtr, Serialize}; | |
use std::alloc::{alloc, dealloc, Layout}; | |
use std::cmp::max; | |
use std::mem::ManuallyDrop; | |
use std::ops::{Deref, DerefMut}; | |
use std::ptr::{copy_nonoverlapping}; | |
use std::sync::atomic::{AtomicUsize, Ordering}; | |
use rkyv::ser::{ScratchSpace, Serializer}; | |
const DEFAULT_SIZE :usize = 128; // default (and min) size of the vector | |
#[derive(Debug)] | |
pub struct ConcurrentVec<T> { | |
ptr: RwLock<(*mut T, usize)>, // pointer and length | |
len: AtomicUsize | |
} | |
unsafe impl <T> Send for ConcurrentVec<T> {} | |
unsafe impl <T> Sync for ConcurrentVec<T> {} | |
impl <T> ConcurrentVec<T> { | |
/// Create a new [ConcurrentVec] | |
pub fn new() -> Self { | |
Self::with_capacity(DEFAULT_SIZE) | |
} | |
/// Create a new [ConcurrentVec] with the given capacity | |
pub fn with_capacity(size :usize) -> Self { | |
// always want to ensure we have at _least_ 1 element | |
let size = max(1, size); | |
unsafe { | |
// make the pointer with capacity | |
let layout = Layout::array::<T>(size).expect("Error creating layout"); | |
let ptr = alloc(layout) as *mut T; | |
ConcurrentVec { | |
ptr: RwLock::new((ptr, size)), | |
len: AtomicUsize::new(0) | |
} | |
} | |
} | |
/// Creates a [ConcurrentVec] with a single element | |
pub fn from_value(value: T) -> Self { | |
let ret = Self::with_capacity(1); | |
ret.push(value); | |
ret | |
} | |
/// Creates a new [ConcurrentVec] from a Vec | |
pub fn from(vec :Vec<T>) -> Self { | |
let mut vec = ManuallyDrop::new(vec); | |
let ptr = vec.as_mut_ptr(); | |
let len = vec.len(); | |
let capacity = vec.capacity(); | |
ConcurrentVec { | |
ptr: RwLock::new((ptr, capacity) ), | |
len: AtomicUsize::new(len) | |
} | |
} | |
/// Get the current length of the [ConcurrentVec] | |
#[inline] | |
pub fn len(&self) -> usize { | |
self.len.load(Ordering::Relaxed) | |
} | |
/// Clears the vector, dropping all the values, and removing all the memory | |
pub fn clear(&self) { | |
let mut write_guard = self.ptr.write(); | |
let (ptr, capacity) = write_guard.deref_mut(); | |
let len = self.len(); | |
unsafe { | |
// drop all the values | |
std::ptr::drop_in_place(std::ptr::slice_from_raw_parts_mut(*ptr, len)); | |
// release the memory | |
let layout = Layout::array::<T>(*capacity).expect("Error creating layout"); | |
dealloc(*ptr as *mut u8, layout); | |
} | |
// update the capacity and the length | |
self.len.store(0, Ordering::Relaxed); | |
*capacity = 0; | |
} | |
/// Adds a new value to the end of the vector | |
/// Returns the index the item was pushed into | |
pub fn push(&self, value :T) -> usize { | |
// get the location of where this value will live first | |
let idx = self.len.fetch_add(1, Ordering::Relaxed); | |
// now attempt to get a read-protected guard for the pointer & capacity | |
let guard = self.ptr.upgradable_read(); | |
let (ptr, capacity) = guard.deref(); | |
// debug!("IN PTR: {:p} IDX: {} CAP: {}", *ptr, idx, capacity); | |
// check to see if we have capacity for this location | |
if idx >= *capacity { | |
// need more capacity, so upgrade our lock | |
let mut guard = RwLockUpgradableReadGuard::upgrade(guard); | |
let (ptr, capacity) = guard.deref_mut(); | |
// double-check we still need to increase capacity, | |
// another thread might have already increased capacity | |
if *capacity <= idx { | |
// check to see if the capacity is zero, this happens after a clear | |
if *capacity == 0 { | |
*capacity = idx + 1; // just enough for our current idx | |
unsafe { | |
// allocate new memory right into the ptr | |
let layout = Layout::array::<T>(*capacity).expect("Error creating layout"); | |
*ptr = alloc(layout) as *mut T; | |
// set the value | |
std::ptr::write(ptr.add(idx), value); | |
} | |
} else { | |
let new_capacity = *capacity * 2; // exponential increase | |
// debug!("ALLOCATING: {} -> {}", *capacity, new_capacity); | |
unsafe { | |
// allocate new memory | |
let layout = Layout::array::<T>(new_capacity).expect("Error creating layout"); | |
let new_ptr = alloc(layout) as *mut T; | |
// move the items over | |
copy_nonoverlapping::<T>(*ptr as *const T, new_ptr, *capacity); | |
// drop the old memory | |
let old_layout = Layout::array::<T>(*capacity).expect("Error creating layout"); | |
dealloc(*ptr as *mut u8, old_layout); | |
// update the ptr and capacity | |
*ptr = new_ptr; | |
*capacity = new_capacity; | |
// set the value | |
std::ptr::write(ptr.add(idx), value); | |
} | |
} | |
} else { | |
// we have enough capacity, just set the value | |
unsafe { std::ptr::write(ptr.add(idx), value); } | |
} | |
} else { | |
unsafe { std::ptr::write(ptr.add(idx), value); } | |
} | |
idx | |
} | |
/// Gets an item in the [ConcurrentVec] and provides it to the function. | |
pub fn get<F: FnOnce(&T)>(&self, index: usize, f :F) { | |
let guard = self.ptr.read(); | |
let (ptr, _) = guard.deref(); | |
let len = self.len(); | |
if len < index { | |
panic!("Attempt to modify out-of-bound index: {} > {}", index, len); | |
} | |
// call the function on the item | |
unsafe { f(&*ptr.add(index)); } | |
} | |
/// Consumes the [ConcurrentVec] and returns a Vec | |
pub fn into_vec(self) -> Vec<T> { | |
let cv = ManuallyDrop::new(self); | |
let guard = cv.ptr.read(); | |
let (ptr, capacity) = guard.deref(); | |
let len = cv.len(); | |
unsafe { Vec::from_raw_parts(*ptr, len, *capacity) } | |
} | |
/// Gets an iterator over the values in the [ConcurrentVec] | |
pub fn iter(&self) -> ConcurrentVecIterator<T> { | |
let guard = self.ptr.read(); | |
let (ptr, _) = guard.deref(); | |
let len = self.len(); | |
ConcurrentVecIterator { | |
ptr: *ptr as *const T, | |
idx: 0, | |
len, | |
_guard: guard | |
} | |
} | |
} | |
impl <T> Drop for ConcurrentVec<T> { | |
fn drop(&mut self) { | |
let len = self.len(); | |
let (ptr, capacity) = self.ptr.get_mut(); | |
unsafe { | |
// drop all the values | |
std::ptr::drop_in_place(std::ptr::slice_from_raw_parts_mut(*ptr, len)); | |
// release the memory | |
let layout = Layout::array::<T>(*capacity).expect("Error creating layout"); | |
dealloc(*ptr as *mut u8, layout); | |
} | |
} | |
} | |
pub struct ConcurrentVecIterator<'a, T> { | |
ptr: *const T, | |
idx: usize, | |
len: usize, | |
_guard: RwLockReadGuard<'a, (*mut T, usize)> | |
} | |
impl <'a, T> Iterator for ConcurrentVecIterator<'a, T> { | |
type Item = &'a T; | |
fn next(&mut self) -> Option<Self::Item> { | |
if self.idx >= self.len { | |
return None; | |
} | |
let ret = unsafe { &*self.ptr.add(self.idx) }; | |
self.idx += 1; | |
Some(ret) | |
} | |
} | |
#[repr(C)] | |
pub struct ArchivedConcurrentVec<T> { | |
ptr: RelPtr<T>, | |
len: Archived<usize>, | |
} | |
pub struct ConcurrentVecResolver { | |
pos: usize, | |
} | |
impl <T: Archive> Archive for ConcurrentVec<T> { | |
type Archived = ArchivedConcurrentVec<T::Archived>; | |
type Resolver = ConcurrentVecResolver; | |
#[inline] | |
unsafe fn resolve(&self, pos: usize, resolver: Self::Resolver, out: *mut Self::Archived) { | |
let (fp, fo) = out_field!(out.ptr); | |
RelPtr::emplace(pos + fp, resolver.pos, fo); | |
let (fp, fo) = out_field!(out.len); | |
let len = self.len(); | |
usize::resolve(&len, pos + fp, (), fo); | |
} | |
} | |
impl<T: Serialize<S>, S: ScratchSpace + Serializer + ?Sized> Serialize<S> for ConcurrentVec<T> { | |
#[inline] | |
fn serialize(&self, serializer: &mut S) -> Result<Self::Resolver, S::Error> { | |
let gaurd = self.ptr.read(); | |
let (ptr, capacity) = gaurd.deref(); | |
Ok(ConcurrentVecResolver { | |
pos: (*ptr).serialize_unsized(serializer)? | |
}) | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment