Skip to content

Instantly share code, notes, and snippets.

@zonyitoo
Created May 13, 2015 03:16
Show Gist options
  • Save zonyitoo/a68c6331d88f37745b59 to your computer and use it in GitHub Desktop.
Save zonyitoo/a68c6331d88f37745b59 to your computer and use it in GitHub Desktop.
Rust Coroutine with work-stealing scheduler demo
#![feature(scoped)]
extern crate coroutine;
extern crate num_cpus;
extern crate deque;
#[macro_use] extern crate log;
extern crate env_logger;
use std::thread;
use std::sync::mpsc::{channel, Sender, Receiver, TryRecvError};
use std::sync::{Mutex, Once, ONCE_INIT};
use std::mem;
use std::cell::UnsafeCell;
use coroutine::{spawn, sched};
use coroutine::coroutine::{State, Handle};
use deque::{BufferPool, Stealer, Worker, Stolen};
static mut THREAD_HANDLES: *const Mutex<Vec<(Sender<SchedMessage>, Stealer<Handle>)>> =
0 as *const Mutex<Vec<(Sender<SchedMessage>, Stealer<Handle>)>>;
static THREAD_HANDLES_ONCE: Once = ONCE_INIT;
fn schedulers() -> &'static Mutex<Vec<(Sender<SchedMessage>, Stealer<Handle>)>> {
unsafe {
THREAD_HANDLES_ONCE.call_once(|| {
let handles: Box<Mutex<Vec<(Sender<SchedMessage>, Stealer<Handle>)>>> =
Box::new(Mutex::new(Vec::new()));
THREAD_HANDLES = mem::transmute(handles);
});
& *THREAD_HANDLES
}
}
thread_local!(static SCHEDULER: UnsafeCell<Scheduler> = UnsafeCell::new(Scheduler::new()));
enum SchedMessage {
NewNeighbor(Sender<SchedMessage>, Stealer<Handle>),
}
struct Scheduler {
workqueue: Worker<Handle>,
workstealer: Stealer<Handle>,
commchannel: Receiver<SchedMessage>,
neighbors: Vec<(Sender<SchedMessage>, Stealer<Handle>)>,
}
impl Scheduler {
fn new() -> Scheduler {
let bufpool = BufferPool::new();
let (worker, stealer) = bufpool.deque();
let (tx, rx) = channel();
let scheds = schedulers();
let mut guard = scheds.lock().unwrap();
for &(ref rtx, _) in guard.iter() {
let _ = rtx.send(SchedMessage::NewNeighbor(tx.clone(), stealer.clone()));
}
guard.push((tx, stealer.clone()));
Scheduler {
workqueue: worker,
workstealer: stealer,
commchannel: rx,
neighbors: guard.clone(),
}
}
fn current() -> &'static mut Scheduler {
SCHEDULER.with(|s| unsafe {
&mut *s.get()
})
}
fn spawn<F>(f: F)
where F: FnOnce() + Send + 'static {
let coro = spawn(f);
let sc = Scheduler::current();
sc.workqueue.push(coro);
}
fn schedule(&mut self) {
loop {
match self.commchannel.try_recv() {
Ok(SchedMessage::NewNeighbor(tx, st)) => {
self.neighbors.push((tx, st));
},
Err(TryRecvError::Empty) => {},
_ => panic!("Receiving from channel: Unknown message")
}
match self.workstealer.steal() {
Stolen::Data(work) => {
if let Err(msg) = work.resume() {
error!("Coroutine panicked! {:?}", msg);
}
match work.state() {
State::Suspended => self.workqueue.push(work),
_ => {}
}
continue;
},
Stolen::Empty => {
debug!("Nothing to do, try to steal from neighbors");
},
Stolen::Abort => {
error!("Abort!?");
}
}
for &(_, ref st) in self.neighbors.iter() {
match st.steal() {
Stolen::Empty => {},
Stolen::Data(coro) => {
if let Err(msg) = coro.resume() {
error!("Coroutine panicked! {:?}", msg);
}
match coro.state() {
State::Suspended => self.workqueue.push(coro),
_ => {}
}
break;
},
Stolen::Abort => {}
}
}
}
}
}
fn main() {
env_logger::init().unwrap();
Scheduler::spawn(|| {
loop {
println!("A in {}", thread::current().name().unwrap());
sched();
}
});
Scheduler::spawn(|| {
loop {
println!("B in {}", thread::current().name().unwrap());
sched();
}
});
Scheduler::spawn(|| {
loop {
println!("C in {}", thread::current().name().unwrap());
sched();
}
});
Scheduler::spawn(|| {
loop {
println!("D in {}", thread::current().name().unwrap());
sched();
}
});
let mut threads = Vec::new();
for tid in 0..num_cpus::get() {
let fut = thread::Builder::new().name(format!("Thread {}", tid)).scoped(|| {
Scheduler::current().schedule();
}).unwrap();
threads.push(fut);
}
for fut in threads.into_iter() {
fut.join();
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment