Skip to content

Instantly share code, notes, and snippets.

Created April 6, 2016 16:26
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save anonymous/8f7db780b338db6fa07eafff8a03873a to your computer and use it in GitHub Desktop.
Save anonymous/8f7db780b338db6fa07eafff8a03873a to your computer and use it in GitHub Desktop.
Shared via Rust Playground
//! This code demonstrates a data flow-like programming pattern, in which each
//! vertex of a DAG is associated with a long-running thead. The vertices
//! forward information along the edges, and the leaves just print out any data
//! they see. In this example, the vertices perform no computation, whereas
//! normally they would.
use std::thread;
use std::sync;
use std::sync::mpsc;
use std::collections::HashMap;
/// Config holds information about the graph, and potentially other pieces of
/// information useful for the computation.
struct Config {
tree: HashMap<u64, Vec<u64>>,
}
/// Manager keeps track of the vertex threads, and possibly stores other shared
/// state like database connection pools, etc. The Manager will resist dropping
/// until all the vertex threads have terminated.
struct Manager {
config: Config,
wait: sync::Mutex<Vec<thread::JoinHandle<()>>>,
}
impl Manager {
/// start() sets up channels for all the edges in the graph, and spawns all
/// the vertex threads. It returns channel send endpoints for all the roots
/// of the DAG to allow users to inject data into the data flow graph.
fn start(&self) -> HashMap<u64, mpsc::SyncSender<u64>> {
// root input channels
let mut sources = HashMap::new();
// edge send channels
let mut feed_forward = HashMap::new();
// edge read channels
let mut inputs = HashMap::<u64, mpsc::Receiver<u64>>::new();
for (thing, depends_on) in self.config.tree.iter() {
let (send_input, recv_input) = mpsc::sync_channel(0);
// register inputs to this vertex
if depends_on.is_empty() {
sources.insert(thing.to_owned(), send_input);
} else {
for d in depends_on.iter() {
if !feed_forward.contains_key(d) {
feed_forward.insert(d, Vec::new());
}
feed_forward.get_mut(d).unwrap().push(send_input.to_owned());
}
}
// store the input end of our incoming edges so "our" thread can
// read from it later.
inputs.insert(thing.to_owned(), recv_input);
}
let mut w = self.wait.lock().unwrap();
w.extend(inputs.into_iter()
.map(|(thing, recv_input)| {
// And now, the problem:
//
// We want to let these threads borrow Manager.
// Could be many reasons for this, such as allowing
// them to read the config. This should be safe,
// because Manager waits for all the threads before
// allowing itself to be dropped.
//
// Possible solutions:
// - could make Manager be a sync::Arc, but then
// drop() won't be called since each thread would
// have outstanding strong references to the
// Manager.
// - could do the above, but give threads sync::Weak.
// However, then we incur the unnecessary overhead
// of modifying ref count all the time. It also
// adds extra code for upgrades everywhere.
// - could do the above, but have threads cast
// upgraded weak to *const on startup. We could
// then unsafe cast back to &Manager.
// It's ugly, but it works with no overhead.
//
// Really though, I should be able to just pass
// `&'a self` to the threads, and have the resulting
// threads be limited to lifetime 'a. bwchk should
// make sure that the JoinHandle is joined against
// before 'a ends. This probably requires JoinHandle
// (or a variant thereof) to join on drop().
// Find all our output edges
let downstream = feed_forward.remove(&thing);
thread::spawn(move || {
let downstream = downstream;
// Keep reading inputs
for i in recv_input.iter() {
match downstream {
// If we have outgoing edges, forward
Some(ref chs) => {
for ch in chs.iter() {
ch.send(i.clone()).unwrap();
}
}
// Otherwise, print
None => println!("leaf {} got {}", thing, i),
}
}
})
}));
sources
}
}
impl Drop for Manager {
/// The Sender returned from start() must be dropped before the Manager,
/// otherwise the threads will never terminate, and the Manager will never
/// be dropped.
fn drop(&mut self) {
let mut ws = self.wait.lock().unwrap();
ws.drain(..).map(|w| w.join()).count();
}
}
fn main() {
let mut tree = HashMap::new();
tree.insert(111, vec![10, 11]);
tree.insert(10, vec![1, 2]);
tree.insert(11, vec![3]);
tree.insert(1, vec![]);
tree.insert(2, vec![]);
tree.insert(3, vec![]);
let m = Manager {
config: Config { tree: tree },
wait: sync::Mutex::new(Vec::new()),
};
let s = m.start();
s[&1].send(9000).unwrap();
// Need to drop root inputs before dropping manager.
// This drains all the edges, causing all the threads to exit.
drop(s);
drop(m);
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment