Skip to content

Instantly share code, notes, and snippets.

@li1
Created January 27, 2019 17:12
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save li1/2206a10bebcce86d01b82a0936162a03 to your computer and use it in GitHub Desktop.
Save li1/2206a10bebcce86d01b82a0936162a03 to your computer and use it in GitHub Desktop.
Understanding differential's group operator
use differential_dataflow::input::InputSession;
use differential_dataflow::operators::consolidate::Consolidate;
use differential_dataflow::operators::Group;
fn main() {
timely::execute_from_args(std::env::args(), move |worker| {
let mut input = InputSession::new();
worker.dataflow(|scope| {
input
.to_collection(scope)
.inspect(|x| println!("1: inspect: {:?}", x))
.map(|tuple| dbg!(tuple))
.group(|key, inp, out| {
// for the second key: 6
// `key`: &Key: 6
// `inp`: &[(&val, count)] -> [(10, 1), (15, 2)]
// Note!: count is actually a Diff, and just abused in the group
// operator to do counting. This makes sense and is semantically very similar;
// a value with count == 0 doesn't really exist, just like two diffs +1 and -1
// cancel each other out.
// `out`: &mut Vec<Val2, Diff2>: []
println!("1: group: {:?}, {:?}, {:?}", key, inp, out);
// we want to build the product by group
let mut x = 1;
for (val, count) in inp {
x *= (**val as i32).pow(*count as u32);
}
// out describes the currently looked-at key's output value, so in the first "iteration"
// this results in (5, 150), in the second (6, 2250), etc.
// since this is an addition, we add diff == 1.
// (In fact, we could use any other number, and this can also be used to speed up algebraic overall computations.)
out.push((x, 1));
// understanding diffing
// // shows diffing: some of these accumulate to 0 and thus can be removed
// out.push(((2, 8), 1));
// out.push(((3, 9), 1));
// out.push(((2, 8), -1));
// out.push(((3, 9), 1));
// // individual runs have nothing to do with each other
// // (2, 8) is only removed for key 6
// out.push(((2, 8), 1));
// out.push(((3, 9), 1));
// if *key == 6 {
// out.push(((2, 8), -1));
// }
// out.push(((3, 9), 1));
})
.inspect(|x| println!("1: inspect products: {:?}", x))
// map isn't sufficient for logging here, since it doesn't pay attention to the temporal
// dimensions of the dataflow
.map(|tuple| dbg!(tuple));
});
// run a dataflow that completely sums up, "naive" way
worker.dataflow(|scope| {
input
.to_collection(scope) // this clones, so both dataflows can run without interfering
.inspect(|x| println!("2: inspect: {:?}", x))
.map(|x| ((), x))
.inspect(|x| println!("2: inspect post-map: {:?}", x))
.group(|key, inp, out| {
println!("2: group: {:?}, {:?}, {:?}", key, inp, out);
let mut x = 0;
for ((_k, v), c) in inp {
x += v * c;
}
out.push((x, 1));
})
.map(|x| x.1)
.inspect(|x| println!("2: sum: {:?}", x));
});
// (nearly) the same thing, but using diffs to aid summing
worker.dataflow(|scope| {
input
.to_collection(scope)
.inspect(|x| println!("3: inspect: {:?}", x))
// explode (similarly to inspect) returns an Option<(data, time, diff)>
.explode(|(_k, v)| Some(((), v)))
.inspect(|x| println!("3: inspect post explode: {:?}", x))
// consolidate sums up the diffs, while grouping by data and time
// basically, it turns the diff stream into the bottom line view (cf. blockchain),
// but only at a point in time for that point in time
// since this also groups by time, the input at 1 will not update this
// computation but is treated separately
.consolidate()
.inspect(|x| println!("3: inspect post consolidate: {:?}", x));
});
// we record inputs for t = 0
input.advance_to(0); // get started
input.insert((5, 10));
input.insert((5, 15));
input.insert((6, 10));
input.insert((6, 15));
input.insert((6, 15));
input.insert((7, 10));
input.insert((7, 10));
input.insert((7, 20));
input.insert((7, 20));
// we promise that we're done with t = 0
input.advance_to(1);
// and insert a (K,V)-pair for t = 1
// this will lead to a retraction of our old group result
// and an addition of the new one
input.insert((5, 30));
})
.expect("Computation terminated abnormally");
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment