Created
January 27, 2019 17:12
-
-
Save li1/2206a10bebcce86d01b82a0936162a03 to your computer and use it in GitHub Desktop.
Understanding differential's group operator
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
use differential_dataflow::input::InputSession; | |
use differential_dataflow::operators::consolidate::Consolidate; | |
use differential_dataflow::operators::Group; | |
fn main() { | |
timely::execute_from_args(std::env::args(), move |worker| { | |
let mut input = InputSession::new(); | |
worker.dataflow(|scope| { | |
input | |
.to_collection(scope) | |
.inspect(|x| println!("1: inspect: {:?}", x)) | |
.map(|tuple| dbg!(tuple)) | |
.group(|key, inp, out| { | |
// for the second key: 6 | |
// `key`: &Key: 6 | |
// `inp`: &[(&val, count)] -> [(10, 1), (15, 2)] | |
// Note!: count is actually a Diff, and just abused in the group | |
// operator to do counting. This makes sense and is semantically very similar; | |
// a value with count == 0 doesn't really exist, just like two diffs +1 and -1 | |
// cancel each other out. | |
// `out`: &mut Vec<Val2, Diff2>: [] | |
println!("1: group: {:?}, {:?}, {:?}", key, inp, out); | |
// we want to build the product by group | |
let mut x = 1; | |
for (val, count) in inp { | |
x *= (**val as i32).pow(*count as u32); | |
} | |
// out describes the currently looked-at key's output value, so in the first "iteration" | |
// this results in (5, 150), in the second (6, 2250), etc. | |
// since this is an addition, we add diff == 1. | |
// (In fact, we could use any other number, and this can also be used to speed up algebraic overall computations.) | |
out.push((x, 1)); | |
// understanding diffing | |
// // shows diffing: some of these accumulate to 0 and thus can be removed | |
// out.push(((2, 8), 1)); | |
// out.push(((3, 9), 1)); | |
// out.push(((2, 8), -1)); | |
// out.push(((3, 9), 1)); | |
// // individual runs have nothing to do with each other | |
// // (2, 8) is only removed for key 6 | |
// out.push(((2, 8), 1)); | |
// out.push(((3, 9), 1)); | |
// if *key == 6 { | |
// out.push(((2, 8), -1)); | |
// } | |
// out.push(((3, 9), 1)); | |
}) | |
.inspect(|x| println!("1: inspect products: {:?}", x)) | |
// map isn't sufficient for logging here, since it doesn't pay attention to the temporal | |
// dimensions of the dataflow | |
.map(|tuple| dbg!(tuple)); | |
}); | |
// run a dataflow that completely sums up, "naive" way | |
worker.dataflow(|scope| { | |
input | |
.to_collection(scope) // this clones, so both dataflows can run without interfering | |
.inspect(|x| println!("2: inspect: {:?}", x)) | |
.map(|x| ((), x)) | |
.inspect(|x| println!("2: inspect post-map: {:?}", x)) | |
.group(|key, inp, out| { | |
println!("2: group: {:?}, {:?}, {:?}", key, inp, out); | |
let mut x = 0; | |
for ((_k, v), c) in inp { | |
x += v * c; | |
} | |
out.push((x, 1)); | |
}) | |
.map(|x| x.1) | |
.inspect(|x| println!("2: sum: {:?}", x)); | |
}); | |
// (nearly) the same thing, but using diffs to aid summing | |
worker.dataflow(|scope| { | |
input | |
.to_collection(scope) | |
.inspect(|x| println!("3: inspect: {:?}", x)) | |
// explode (similarly to inspect) returns an Option<(data, time, diff)> | |
.explode(|(_k, v)| Some(((), v))) | |
.inspect(|x| println!("3: inspect post explode: {:?}", x)) | |
// consolidate sums up the diffs, while grouping by data and time | |
// basically, it turns the diff stream into the bottom line view (cf. blockchain), | |
// but only at a point in time for that point in time | |
// since this also groups by time, the input at 1 will not update this | |
// computation but is treated separately | |
.consolidate() | |
.inspect(|x| println!("3: inspect post consolidate: {:?}", x)); | |
}); | |
// we record inputs for t = 0 | |
input.advance_to(0); // get started | |
input.insert((5, 10)); | |
input.insert((5, 15)); | |
input.insert((6, 10)); | |
input.insert((6, 15)); | |
input.insert((6, 15)); | |
input.insert((7, 10)); | |
input.insert((7, 10)); | |
input.insert((7, 20)); | |
input.insert((7, 20)); | |
// we promise that we're done with t = 0 | |
input.advance_to(1); | |
// and insert a (K,V)-pair for t = 1 | |
// this will lead to a retraction of our old group result | |
// and an addition of the new one | |
input.insert((5, 30)); | |
}) | |
.expect("Computation terminated abnormally"); | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment