Skip to content

Instantly share code, notes, and snippets.

@utaal
Created October 18, 2017 13:00
Show Gist options
  • Save utaal/7ca2ef09f2b0a2ac33ceedb7cf17180f to your computer and use it in GitHub Desktop.
Save utaal/7ca2ef09f2b0a2ac33ceedb7cf17180f to your computer and use it in GitHub Desktop.
extern crate timely;
use std::rc::Rc;
use std::cell::RefCell;
use timely::dataflow::InputHandle;
use timely::dataflow::operators::{Input, Exchange, Inspect, Probe, Map};
fn main() {
// initializes and runs a timely dataflow.
timely::execute_from_args(std::env::args(), |worker| {
let index = worker.index();
let mut input = InputHandle::new();
// create a new input, exchange data, and inspect its output
let probe = worker.dataflow(|scope| {
let hashmap: Rc<RefCell<std::collections::HashMap<u32, u32>>> =
Rc::new(RefCell::new(std::collections::HashMap::new()));
let hashmap_map = hashmap.clone();
let hashmap_inspect = hashmap.clone();
scope.input_from(&mut input)
.exchange(|_| 0)
.map(move |x: u32| {
hashmap_map.borrow_mut().insert(x, x);
x
})
.exchange(|_| 1)
.inspect(move |x| {
println!("{}", hashmap_inspect.borrow().get(&x).expect("x not in hashmap"));
})
.probe()
});
// introduce data and watch!
for round in 0..10 {
if index == 0 {
input.send(round);
}
input.advance_to(round + 1);
while probe.less_than(input.time()) {
worker.step();
}
}
}).unwrap();
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment