Skip to content

Instantly share code, notes, and snippets.

@cuviper
Created January 1, 2016 10:34
Show Gist options
  • Save cuviper/cf7a055c468d0447b570 to your computer and use it in GitHub Desktop.
Save cuviper/cf7a055c468d0447b570 to your computer and use it in GitHub Desktop.
alt latch condvars for nikomatsakis/rayon#9
diff --git a/src/api.rs b/src/api.rs
index 5d7871d5bc06..e122eb4dce16 100644
--- a/src/api.rs
+++ b/src/api.rs
@@ -1,4 +1,4 @@
-use latch::Latch;
+use latch::{ProbingLatch, WaitingLatch};
#[allow(unused_imports)]
use log::Event::*;
use job::{Code, CodeImpl, Job};
@@ -45,7 +45,7 @@ pub fn join<A,B,RA,RB>(oper_a: A,
// done here so that the stack frame can keep it all live
// long enough
let mut code_b = CodeImpl::new(oper_b, &mut result_b);
- let mut latch_b = Latch::new();
+ let mut latch_b = ProbingLatch::new();
let mut job_b = Job::new(&mut code_b, &mut latch_b);
(*worker_thread).push(&mut job_b);
@@ -77,12 +77,12 @@ unsafe fn join_inject<A,B,RA,RB>(oper_a: A,
{
let mut result_a = None;
let mut code_a = CodeImpl::new(oper_a, &mut result_a);
- let mut latch_a = Latch::new();
+ let mut latch_a = WaitingLatch::new();
let mut job_a = Job::new(&mut code_a, &mut latch_a);
let mut result_b = None;
let mut code_b = CodeImpl::new(oper_b, &mut result_b);
- let mut latch_b = Latch::new();
+ let mut latch_b = WaitingLatch::new();
let mut job_b = Job::new(&mut code_b, &mut latch_b);
thread_pool::get_registry().inject(&[&mut job_a, &mut job_b]);
@@ -112,7 +112,7 @@ impl ThreadPool {
unsafe {
let mut result_a = None;
let mut code_a = CodeImpl::new(op, &mut result_a);
- let mut latch_a = Latch::new();
+ let mut latch_a = WaitingLatch::new();
let mut job_a = Job::new(&mut code_a, &mut latch_a);
self.registry.inject(&[&mut job_a]);
latch_a.wait();
diff --git a/src/latch.rs b/src/latch.rs
index a81831d1cc7a..aa6f34387ef6 100644
--- a/src/latch.rs
+++ b/src/latch.rs
@@ -1,34 +1,69 @@
+use std::sync::{Mutex, Condvar};
use std::sync::atomic::{AtomicBool, Ordering};
-use std::thread;
-/// A Latch starts as false and eventually becomes true. You can block
-/// until it becomes true.
-pub struct Latch {
+/// A Latch starts as false and eventually becomes true.
+/// The means to check its state depends on the impl.
+pub trait Latch {
+
+ /// Set the latch to true, releasing all threads who are waiting.
+ fn set(&self);
+}
+
+
+/// A ProbingLatch can be probed to see if it's true yet.
+pub struct ProbingLatch {
b: AtomicBool
}
-impl Latch {
+impl Latch for ProbingLatch {
+ fn set(&self) {
+ self.b.store(true, Ordering::SeqCst);
+ }
+}
+
+impl ProbingLatch {
#[inline]
- pub fn new() -> Latch {
- Latch {
+ pub fn new() -> ProbingLatch {
+ ProbingLatch {
b: AtomicBool::new(false)
}
}
- /// Set the latch to true, releasing all threads who are waiting.
- pub fn set(&self) {
- self.b.store(true, Ordering::SeqCst);
+ /// Test if latch is set.
+ pub fn probe(&self) -> bool {
+ self.b.load(Ordering::SeqCst)
+ }
+}
+
+
+/// A WaitingLatch can efficiently wait until it's true.
+pub struct WaitingLatch {
+ m: Mutex<bool>,
+ c: Condvar,
+}
+
+impl Latch for WaitingLatch {
+ fn set(&self) {
+ let mut guard = self.m.lock().unwrap();
+ *guard = true;
+ self.c.notify_all();
}
+}
- /// Spin until latch is set. Use with caution.
- pub fn wait(&self) {
- while !self.probe() {
- thread::yield_now();
+impl WaitingLatch {
+ #[inline]
+ pub fn new() -> WaitingLatch {
+ WaitingLatch {
+ m: Mutex::new(false),
+ c: Condvar::new(),
}
}
- /// Test if latch is set.
- pub fn probe(&self) -> bool {
- self.b.load(Ordering::SeqCst)
+ /// Spin until latch is set. Use with caution.
+ pub fn wait(&self) {
+ let mut guard = self.m.lock().unwrap();
+ while !*guard {
+ guard = self.c.wait(guard).unwrap();
+ }
}
}
diff --git a/src/thread_pool.rs b/src/thread_pool.rs
index 28cbefc2b5c6..163ef719aa4a 100644
--- a/src/thread_pool.rs
+++ b/src/thread_pool.rs
@@ -1,6 +1,6 @@
use deque::{BufferPool, Worker, Stealer, Stolen};
use job::Job;
-use latch::Latch;
+use latch::{Latch, ProbingLatch, WaitingLatch};
#[allow(unused_imports)]
use log::Event::*;
use num_cpus;
@@ -145,7 +145,7 @@ impl RegistryState {
struct ThreadInfo {
// latch is set once thread has started and we are entering into
// the main loop
- primed: Latch,
+ primed: WaitingLatch,
worker: Worker<JobRef>,
stealer: Stealer<JobRef>,
}
@@ -154,7 +154,7 @@ impl ThreadInfo {
fn new(pool: &BufferPool<JobRef>) -> ThreadInfo {
let (worker, stealer) = pool.deque();
ThreadInfo {
- primed: Latch::new(),
+ primed: WaitingLatch::new(),
worker: worker,
stealer: stealer,
}
@@ -219,7 +219,7 @@ impl WorkerThread {
self.thread_info().worker.pop().is_some()
}
- pub unsafe fn steal_until(&self, latch: &Latch) {
+ pub unsafe fn steal_until(&self, latch: &ProbingLatch) {
while !latch.probe() {
if let Some(job) = steal_work(&self.registry, self.index) {
(*job).execute();
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment