Created
May 12, 2013 04:31
-
-
Save Aatch/5562415 to your computer and use it in GitHub Desktop.
Chase & Lev work-stealing algorithm in Rust
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
use core::unstable::intrinsics::{init, atomic_cxchg, atomic_xchg}; | |
/** | |
* Implementation of the Chase & Lev Work-Stealing deque. | |
* | |
* This requires using owned pointers to data in order to ensure that the data is freed at the | |
* right time and place, it also allows for some of the operations to be atomic, which would not be | |
* possible if the data was bigger than a pointer. | |
* | |
* One key difference from Chase & Lev is that this implementation zeroes out the location in | |
* the circular buffer, which also indicates a race (that was lost), so that is checked before | |
* bumping up top. | |
* | |
* The code uses a lot of unsafe code and therefore isn't appropriate for general usage. | |
* | |
*/ | |
/* | |
* TODO: Use a better memory-management scheme, copying as we are now is not optimal. | |
*/ | |
pub struct WorkQueue<T> { | |
priv top : u64, | |
priv bottom : u64, | |
priv active_buffer: WorkBuffer<T> | |
} | |
pub struct WorkBuffer<T> { | |
priv buf: ~[~T] | |
} | |
#[deriving(Eq)] | |
pub enum QueueResult<T> { | |
Empty, | |
Abort, | |
Have(T) | |
} | |
/// Gets the numerical value of the actual owned pointer, instead of trying | |
/// to go to the header offset (and triggering a segfault) | |
fn owned_ptr_val<T>(a : &~T) -> uint { | |
unsafe { | |
let p : &uint = cast::transmute(a); | |
*p | |
} | |
} | |
pub impl<T> WorkQueue<T> { | |
fn new() -> WorkQueue<T> { | |
static INIT_QUEUE_SIZE : uint = 64; | |
WorkQueue { | |
top: 0, | |
bottom: 0, | |
active_buffer: WorkBuffer::new(INIT_QUEUE_SIZE) | |
} | |
} | |
fn push(&mut self, o:~T) { | |
let b = self.bottom; | |
let t = self.top; | |
let size = (b - t) as uint; | |
if size >= self.active_buffer.len()-1 { | |
self.active_buffer = self.active_buffer.grow(b,t); | |
} | |
self.active_buffer.put(b, o); | |
self.bottom = b+1; | |
} | |
fn pop(&mut self) -> QueueResult<~T> { | |
let b = self.bottom - 1; | |
let t, o, size; | |
{ // scoping because borrowck | |
{ | |
let buf = &mut self.active_buffer; | |
self.bottom = b; | |
t = self.top; | |
size = (b - t) as int; | |
if size < 0 { | |
self.bottom = t; | |
return Empty; | |
} | |
o = buf.take(b); | |
} | |
if size > 0 { | |
self.try_shrink(b, t); | |
return Have(o); | |
} | |
}; | |
let val = if !self.cas_top(t, t+1) { | |
Empty | |
} else { | |
Have(o) | |
}; | |
self.bottom = t+1; | |
return val; | |
} | |
fn steal(&mut self) -> QueueResult<~T> { | |
let t = self.top; | |
let b = self.bottom; | |
let size = (b - t) as int; | |
if size <= 0 { | |
return Empty; | |
} | |
let o = self.active_buffer.take(t); | |
// The original just uses the the cas to check if it worked, | |
// but because we actually take the value, a race can also be | |
// detected when we get a zero value. | |
if owned_ptr_val(&o) != 0 && self.cas_top(t, t+1) { | |
Have(o) | |
} else { | |
Abort | |
} | |
} | |
priv fn cas_top(&mut self, old:u64, new:u64) -> bool { | |
let old = old as int; | |
let new = new as int; | |
unsafe { | |
atomic_cxchg(cast::transmute(&mut self.top), old, new) == old | |
} | |
} | |
priv fn try_shrink(&mut self, bot:u64, top:u64) { | |
let size = (bot - top) as uint; | |
if size < (self.active_buffer.len()/3) { // 3 is the K from the paper, K <= 3 | |
self.active_buffer = self.active_buffer.shrink(bot, top); | |
} | |
} | |
} | |
pub impl<T> WorkBuffer<T> { | |
fn new(size:uint) -> WorkBuffer<T> { | |
// Initialize the buffer to 0 | |
let buf = vec::from_fn(size, |_| init()); | |
WorkBuffer { buf:buf } | |
} | |
/** | |
* Takes the element from the buffer. This is unsafe | |
* because there may not be a valid element at the location. | |
*/ | |
unsafe fn take(&mut self, idx:u64) -> ~T { | |
let i = self.wrap(idx); | |
// This effectively pretends that we are just | |
// moving a value from some location, not moving | |
// it from inside a vector | |
do vec::as_mut_buf(self.buf) |p,_| { | |
let p = cast::transmute(ptr::mut_offset(p, i)); | |
cast::transmute(atomic_xchg(p, 0)) | |
} | |
} | |
unsafe fn put(&mut self, idx:u64, t:~T) { | |
let i = self.wrap(idx); | |
self.buf.unsafe_set(i, t); | |
} | |
fn len(&self) -> uint { | |
self.buf.len() | |
} | |
fn grow(&mut self, bot:u64, top:u64) -> WorkBuffer<T> { | |
debug!("Growing Buffer: %u -> %u", top as uint, bot as uint); | |
let mut buf = WorkBuffer::new(self.len() << 1); | |
for u64::range(top, bot) |i| { | |
buf.put(i, self.take(i)); | |
} | |
buf | |
} | |
fn shrink(&mut self, bot:u64, top:u64) -> WorkBuffer<T> { | |
debug!("Shrinking Buffer: %u -> %u", top as uint, bot as uint); | |
let mut buf = WorkBuffer::new(self.len() >> 1); | |
for u64::range(top, bot) |i| { | |
buf.put(i, self.take(i)); | |
} | |
buf | |
} | |
priv fn wrap(&self, i:u64) -> uint { | |
let l = self.len() as u64; | |
(i & (l-1)) as uint | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment