Skip to content

Instantly share code, notes, and snippets.

@spastorino
Created May 15, 2018 20:04
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save spastorino/2913826a97e56aa5f3470152d3ec4c04 to your computer and use it in GitHub Desktop.
Save spastorino/2913826a97e56aa5f3470152d3ec4c04 to your computer and use it in GitHub Desktop.
// Copyright 2017 The Rust Project Developers. See the COPYRIGHT
// file at the top-level directory of this distribution and at
// http://rust-lang.org/COPYRIGHT.
//
// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
// option. This file may not be copied, modified, or distributed
// except according to those terms.
//! Timely dataflow runs on its own thread.
use crate::facts::AllFacts;
use crate::output::Output;
use differential_dataflow::collection::Collection;
use differential_dataflow::operators::*;
use std::collections::{BTreeMap, BTreeSet};
use std::mem;
use std::sync::mpsc;
use std::sync::Arc;
use std::sync::Mutex;
use timely;
use timely::dataflow::operators::*;
pub(super) fn compute(dump_enabled: bool, all_facts: AllFacts) -> Output {
let result = Arc::new(Mutex::new(Output::new(dump_enabled)));
// Use a channel to send `all_facts` to one worker (and only one)
let (tx, rx) = mpsc::channel();
tx.send(all_facts).unwrap();
mem::drop(tx);
let rx = Mutex::new(rx);
timely::execute_from_args(vec![].into_iter(), {
let result = result.clone();
move |worker| {
// First come, first serve: one worker gets all the facts;
// the others get empty vectors.
let my_facts = rx.lock()
.unwrap()
.recv()
.unwrap_or_else(|_| AllFacts::default());
worker.dataflow::<(), _, _>(|scope| {
macro_rules! let_collections {
(let ($($facts:ident,)*) = ..$base:expr;) => {
let ($($facts),*) = (
$(Collection::<_, _, isize>::new(
$base.$facts
.to_stream(scope)
.map(|datum| (datum, Default::default(), 1)),
),)*
);
}
}
let_collections! {
let (
borrow_region,
universal_region,
cfg_edge,
killed,
outlives,
region_live_at,
) = ..my_facts;
}
// .decl subset(Ra, Rb) -- `R1 <= R2` holds
//
// subset_base(Ra, Rb) :- outlives(Ra, Rb, _P)
let subset_base = outlives
.map(|(r_a, r_b, _p)| (r_a, r_b))
.distinct_total();
// compute transitive closure of the subset relation
// -- just an example though, we don't really need to do this I don't think
// REMOVE THIS
let subset = subset_base.iterate(|subset| {
let subset_base = subset_base.enter(&subset.scope());
// subset(Ra, Rb) :- subset_base(Ra, Rb)
let subset1 = subset_base.clone();
// subset(Ra, Rc) :-
// subset(Ra, Rb),
// subset_base(Rb, Rc).
let subset2 =
subset.map(|(r_a, r_b)| (r_b, r_a))
.join(&subset_base)
.map(|(r_b, r_a, r_c)| (r_a, r_c));
subset1.concat(&subset2).distinct_total()
});
// requires(R, L) :- borrow_region(R, L, _P).
let requires = borrow_region
.map(|(r, l, _p)| (r, l))
.distinct_total();
// requires(R2, L) :- requires(R1, L), subset(R1, R2).
let requires = requires.iterate(|requires| {
let subset_base = subset_base.enter(&requires.scope());
requires
.join(&subset_base)
.map(|(r1, l, r2)| (r2, l))
.distinct_total()
});
//potential_errors(L, P) :-
// invalidated(L, P),
// requires(R, L),
// region_live_at(R, P).
// .decl borrow_live_at(B, P) -- true if the restrictions of the borrow B
// need to be enforced at the point P
let borrow_live_at = {
// borrow_live_at(B, P) :- requires(R, B, P), region_live_at(R, P)
let borrow_live_at1 = requires
.map(|(r, b, p)| ((r, p), b))
.semijoin(&region_live_at)
.map(|((_r, p), b)| (b, p));
// borrow_live_at(B, P) :- requires(R, B, P), universal_region(R).
let borrow_live_at2 = requires
.map(|(r, b, p)| (r, (p, b)))
.semijoin(&universal_region)
.map(|(_r, (p, b))| (b, p));
borrow_live_at1.concat(&borrow_live_at2).distinct()
};
if dump_enabled {
region_live_at.inspect_batch({
let result = result.clone();
move |_timestamp, facts| {
let mut result = result.lock().unwrap();
for ((region, location), _timestamp, multiplicity) in facts {
assert_eq!(*multiplicity, 1);
result
.region_live_at
.entry(*location)
.or_insert(vec![])
.push(*region);
}
}
});
subset.inspect_batch({
let result = result.clone();
move |_timestamp, facts| {
let mut result = result.lock().unwrap();
for ((r1, r2, location), _timestamp, multiplicity) in facts {
assert_eq!(*multiplicity, 1);
result
.subset
.entry(*location)
.or_insert(BTreeMap::new())
.entry(*r1)
.or_insert(BTreeSet::new())
.insert(*r2);
result.region_degrees.update_degrees(*r1, *r2, *location);
}
}
});
requires.inspect_batch({
let result = result.clone();
move |_timestamp, facts| {
let mut result = result.lock().unwrap();
for ((region, borrow, location), _timestamp, multiplicity) in facts {
assert_eq!(*multiplicity, 1);
result
.restricts
.entry(*location)
.or_insert(BTreeMap::new())
.entry(*region)
.or_insert(BTreeSet::new())
.insert(*borrow);
}
}
});
}
borrow_live_at.inspect_batch({
let result = result.clone();
move |_timestamp, facts| {
let mut result = result.lock().unwrap();
for ((borrow, location), _timestamp, multiplicity) in facts {
assert_eq!(*multiplicity, 1);
result
.borrow_live_at
.entry(*location)
.or_insert(Vec::new())
.push(*borrow);
}
}
});
});
}
}).unwrap();
// Remove from the Arc and Mutex, since it is fully populated now.
Arc::try_unwrap(result)
.unwrap_or_else(|_| panic!("somebody still has a handle to this arc"))
.into_inner()
.unwrap()
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment