Skip to content

Instantly share code, notes, and snippets.

@li1
Created March 20, 2019 20:00
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save li1/7f55e281f73e2649d1ed1cf6509b906b to your computer and use it in GitHub Desktop.
Save li1/7f55e281f73e2649d1ed1cf6509b906b to your computer and use it in GitHub Desktop.
reorder operator example
use std::collections::HashMap;
use timely::dataflow::channels::pact::Pipeline;
use timely::dataflow::operators::generic::Operator;
use timely::dataflow::operators::unordered_input::UnorderedInput;
use timely::dataflow::operators::Inspect;
fn main() {
timely::execute_from_args(std::env::args(), |worker| {
let (mut input, cap) = worker.dataflow::<usize, _, _>(|scope| {
let mut queues = HashMap::new();
let (input, stream) = scope.new_unordered_input();
stream
.inspect_batch(move |epoch, data| {
for d in data {
println!("{}@{:?}", epoch, d);
}
})
.unary_notify(
Pipeline,
"reorder",
None,
move |input, output, notificator| {
input.for_each(|cap, data| {
queues
.entry(cap.time().clone())
.or_insert(Vec::new())
.push(data.replace(Vec::new()));
println!("{:?}", queues);
notificator.notify_at(cap.retain());
});
// this is never called with I/O block
notificator.for_each(|cap, _cnt, _not| {
let mut session = output.session(&cap);
println!("done with time {:?}", &cap);
if let Some(list) = queues.remove(cap.time()) {
for mut vector in list.into_iter() {
session.give_vec(&mut vector);
}
}
});
},
)
.inspect_batch(move |epoch, data| {
for d in data {
println!("{}@{:?}", epoch, d);
}
});
input
});
input.session(cap.delayed(&0)).give('D');
input.session(cap.delayed(&2)).give('B');
input.session(cap.delayed(&1)).give('A');
input.session(cap.delayed(&2)).give('b');
input.session(cap.delayed(&3)).give('C');
input.session(cap.delayed(&3)).give('c');
input.session(cap.delayed(&1)).give('a');
worker.step(); // this runs to unary_notificator
use std::io::stdin;
stdin().read_line(&mut String::new()).unwrap();
})
.unwrap();
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment