Skip to content

Instantly share code, notes, and snippets.

@Aatch
Created May 12, 2013 04:31
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save Aatch/5562415 to your computer and use it in GitHub Desktop.
Save Aatch/5562415 to your computer and use it in GitHub Desktop.
Chase & Lev work-stealing algorithm in Rust
use core::unstable::intrinsics::{init, atomic_cxchg, atomic_xchg};
/**
* Implementation of the Chase & Lev Work-Stealing deque.
*
* This requires using owned pointers to data in order to ensure that the data is freed at the
* right time and place, it also allows for some of the operations to be atomic, which would not be
* possible if the data was bigger than a pointer.
*
* One key difference from Chase & Lev is that this implementation zeroes out the location in
* the circular buffer, which also indicates a race (that was lost), so that is checked before
* bumping up top.
*
* The code uses a lot of unsafe code and therefore isn't appropriate for general usage.
*
*/
/*
* TODO: Use a better memory-management scheme, copying as we are now is not optimal.
*/
pub struct WorkQueue<T> {
priv top : u64,
priv bottom : u64,
priv active_buffer: WorkBuffer<T>
}
pub struct WorkBuffer<T> {
priv buf: ~[~T]
}
#[deriving(Eq)]
pub enum QueueResult<T> {
Empty,
Abort,
Have(T)
}
/// Gets the numerical value of the actual owned pointer, instead of trying
/// to go to the header offset (and triggering a segfault)
fn owned_ptr_val<T>(a : &~T) -> uint {
unsafe {
let p : &uint = cast::transmute(a);
*p
}
}
pub impl<T> WorkQueue<T> {
fn new() -> WorkQueue<T> {
static INIT_QUEUE_SIZE : uint = 64;
WorkQueue {
top: 0,
bottom: 0,
active_buffer: WorkBuffer::new(INIT_QUEUE_SIZE)
}
}
fn push(&mut self, o:~T) {
let b = self.bottom;
let t = self.top;
let size = (b - t) as uint;
if size >= self.active_buffer.len()-1 {
self.active_buffer = self.active_buffer.grow(b,t);
}
self.active_buffer.put(b, o);
self.bottom = b+1;
}
fn pop(&mut self) -> QueueResult<~T> {
let b = self.bottom - 1;
let t, o, size;
{ // scoping because borrowck
{
let buf = &mut self.active_buffer;
self.bottom = b;
t = self.top;
size = (b - t) as int;
if size < 0 {
self.bottom = t;
return Empty;
}
o = buf.take(b);
}
if size > 0 {
self.try_shrink(b, t);
return Have(o);
}
};
let val = if !self.cas_top(t, t+1) {
Empty
} else {
Have(o)
};
self.bottom = t+1;
return val;
}
fn steal(&mut self) -> QueueResult<~T> {
let t = self.top;
let b = self.bottom;
let size = (b - t) as int;
if size <= 0 {
return Empty;
}
let o = self.active_buffer.take(t);
// The original just uses the the cas to check if it worked,
// but because we actually take the value, a race can also be
// detected when we get a zero value.
if owned_ptr_val(&o) != 0 && self.cas_top(t, t+1) {
Have(o)
} else {
Abort
}
}
priv fn cas_top(&mut self, old:u64, new:u64) -> bool {
let old = old as int;
let new = new as int;
unsafe {
atomic_cxchg(cast::transmute(&mut self.top), old, new) == old
}
}
priv fn try_shrink(&mut self, bot:u64, top:u64) {
let size = (bot - top) as uint;
if size < (self.active_buffer.len()/3) { // 3 is the K from the paper, K <= 3
self.active_buffer = self.active_buffer.shrink(bot, top);
}
}
}
pub impl<T> WorkBuffer<T> {
fn new(size:uint) -> WorkBuffer<T> {
// Initialize the buffer to 0
let buf = vec::from_fn(size, |_| init());
WorkBuffer { buf:buf }
}
/**
* Takes the element from the buffer. This is unsafe
* because there may not be a valid element at the location.
*/
unsafe fn take(&mut self, idx:u64) -> ~T {
let i = self.wrap(idx);
// This effectively pretends that we are just
// moving a value from some location, not moving
// it from inside a vector
do vec::as_mut_buf(self.buf) |p,_| {
let p = cast::transmute(ptr::mut_offset(p, i));
cast::transmute(atomic_xchg(p, 0))
}
}
unsafe fn put(&mut self, idx:u64, t:~T) {
let i = self.wrap(idx);
self.buf.unsafe_set(i, t);
}
fn len(&self) -> uint {
self.buf.len()
}
fn grow(&mut self, bot:u64, top:u64) -> WorkBuffer<T> {
debug!("Growing Buffer: %u -> %u", top as uint, bot as uint);
let mut buf = WorkBuffer::new(self.len() << 1);
for u64::range(top, bot) |i| {
buf.put(i, self.take(i));
}
buf
}
fn shrink(&mut self, bot:u64, top:u64) -> WorkBuffer<T> {
debug!("Shrinking Buffer: %u -> %u", top as uint, bot as uint);
let mut buf = WorkBuffer::new(self.len() >> 1);
for u64::range(top, bot) |i| {
buf.put(i, self.take(i));
}
buf
}
priv fn wrap(&self, i:u64) -> uint {
let l = self.len() as u64;
(i & (l-1)) as uint
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment