Last active April 16, 2019 06:52
The shipping puzzle ( in Differential Dataflow
extern crate abomonation_derive;
extern crate abomonation;
extern crate timely;
extern crate differential_dataflow;
use std::fs::File;
use std::io::{BufRead,BufReader};
use timely::dataflow::*;
use timely::dataflow::operators::probe::Handle;
use differential_dataflow::Collection;
use differential_dataflow::input::Input;
use differential_dataflow::lattice::Lattice;
use differential_dataflow::operators::*;
use differential_dataflow::operators::arrange::ArrangeByKey;
#[derive(Ord, PartialOrd, Eq, PartialEq, Clone, Hash, Debug, Abomonation)]
enum Day { M, T, W, R, F, }
type Leg = (u64, Day, String, String);
type Route = (Vec<u64>,String,bool);
type Meetpoint = (Day,String);
fn next_day(day: &Day) -> Day {
match &day {
Day::M => Day::T,
Day::T => Day::W,
Day::W => Day::R,
Day::R => Day::F,
Day::F => Day::F,
fn initialize_route(day: &Day, (id,start,end): &(u64,String,String)) -> (Meetpoint,Route) {
fn main() {
timely::execute_from_args(std::env::args(), move |worker| {
let timer = ::std::time::Instant::now();
let mut probe = Handle::new();
let mut legs_in = worker.dataflow::<u64, _, _>(|scope| {
let (legs_in, raw_legs) = scope.new_collection::<Leg,isize>();
let start_on = raw_legs
.map(|(id,day,start,end)| (day, (id,start,end)))
let routes = start_on
.filter(|day,_leg| day == &Day::M)
.as_collection(|day, (id,_start,end)| ((next_day(&day),end.to_string()),(vec![*id],end.to_string(),false)))
.concat(&start_on.filter(|day,_leg| day == &Day::T).as_collection(initialize_route))
.concat(&start_on.filter(|day,_leg| day == &Day::W).as_collection(initialize_route))
.concat(&start_on.filter(|day,_leg| day == &Day::R).as_collection(initialize_route))
.concat(&start_on.filter(|day,_leg| day == &Day::F).as_collection(initialize_route))
.map(|_| ())
.inspect(|x| { println!("found {:?} routes", x); })
.probe_with(&mut probe);
let input = BufReader::new(File::open("/Users/niko/data/shipping-puzzle/legs.txt").unwrap());
let index = worker.index();
let num_workers = worker.peers();
let mut line_count = 0;
for line in input.lines() {
if line_count % num_workers == index {
let l = line.unwrap();
let vals: Vec<&str> = l.split(" ").collect();
let id: u64 = vals[0].parse().unwrap();
let day: Day = match vals[3] {
"M" => Day::M,
"T" => Day::T,
"W" => Day::W,
"R" => Day::R,
"F" => Day::F,
_ => panic!("Invalid day found: {}", vals[3]),
legs_in.insert((id, day, vals[1].to_string(), vals[2].to_string()));
line_count = line_count + 1;
println!("{:?}\tloaded", timer.elapsed());
worker.step_while(|| probe.less_than(legs_in.time()));
println!("{:?}\tstable", timer.elapsed());
legs_in.insert((20000, Day::W, "LOS_ANGELES".to_string(), "CHARLOTTE".to_string()));
worker.step_while(|| probe.less_than(legs_in.time()));
println!("{:?}\tstable", timer.elapsed());
trait PairUp<G: Scope> { fn pair_up(&self) -> Collection<G, (Meetpoint,Route), isize>; }
impl<G> PairUp<G> for Collection<G, (Meetpoint,Route), isize>
where G: Scope, G::Timestamp: Lattice,
fn pair_up(&self) -> Collection<G, (Meetpoint,Route)> {
.group(|(day,place), input, output| {
let (mut arrivals, mut departures): (Vec<Route>,Vec<Route>) = input.iter().cloned()
.map(|(x,_diff)| x.clone())
.partition(|(_,_,is_departing)| *is_departing == false);
let pairs = arrivals.iter().zip(departures.iter());
for ((arriving,_,_),(departing,to,_)) in pairs {
let mut route = arriving.clone();
output.push((((next_day(day),to.clone()),(route,to.clone(),false)), 1))
if arrivals.len() > departures.len() {
for (route,to,_) in arrivals.drain(departures.len()..) {
output.push((((day.clone(),place.clone()),(route,to,false)), 1))
} else if departures.len() > arrivals.len() {
for (route,to,_) in departures.drain(arrivals.len()..) {
output.push((((next_day(day),to.clone()),(route,to,false)), 1))
.map(|(_meetpoint,route)| route)
