-
-
Save spastorino/2913826a97e56aa5f3470152d3ec4c04 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// Copyright 2017 The Rust Project Developers. See the COPYRIGHT | |
// file at the top-level directory of this distribution and at | |
// http://rust-lang.org/COPYRIGHT. | |
// | |
// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or | |
// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license | |
// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your | |
// option. This file may not be copied, modified, or distributed | |
// except according to those terms. | |
//! Timely dataflow runs on its own thread. | |
use crate::facts::AllFacts; | |
use crate::output::Output; | |
use differential_dataflow::collection::Collection; | |
use differential_dataflow::operators::*; | |
use std::collections::{BTreeMap, BTreeSet}; | |
use std::mem; | |
use std::sync::mpsc; | |
use std::sync::Arc; | |
use std::sync::Mutex; | |
use timely; | |
use timely::dataflow::operators::*; | |
pub(super) fn compute(dump_enabled: bool, all_facts: AllFacts) -> Output { | |
let result = Arc::new(Mutex::new(Output::new(dump_enabled))); | |
// Use a channel to send `all_facts` to one worker (and only one) | |
let (tx, rx) = mpsc::channel(); | |
tx.send(all_facts).unwrap(); | |
mem::drop(tx); | |
let rx = Mutex::new(rx); | |
timely::execute_from_args(vec![].into_iter(), { | |
let result = result.clone(); | |
move |worker| { | |
// First come, first serve: one worker gets all the facts; | |
// the others get empty vectors. | |
let my_facts = rx.lock() | |
.unwrap() | |
.recv() | |
.unwrap_or_else(|_| AllFacts::default()); | |
worker.dataflow::<(), _, _>(|scope| { | |
macro_rules! let_collections { | |
(let ($($facts:ident,)*) = ..$base:expr;) => { | |
let ($($facts),*) = ( | |
$(Collection::<_, _, isize>::new( | |
$base.$facts | |
.to_stream(scope) | |
.map(|datum| (datum, Default::default(), 1)), | |
),)* | |
); | |
} | |
} | |
let_collections! { | |
let ( | |
borrow_region, | |
universal_region, | |
cfg_edge, | |
killed, | |
outlives, | |
region_live_at, | |
) = ..my_facts; | |
} | |
// .decl subset(Ra, Rb) -- `R1 <= R2` holds | |
// | |
// subset_base(Ra, Rb) :- outlives(Ra, Rb, _P) | |
let subset_base = outlives | |
.map(|(r_a, r_b, _p)| (r_a, r_b)) | |
.distinct_total(); | |
// compute transitive closure of the subset relation | |
// -- just an example though, we don't really need to do this I don't think | |
// REMOVE THIS | |
let subset = subset_base.iterate(|subset| { | |
let subset_base = subset_base.enter(&subset.scope()); | |
// subset(Ra, Rb) :- subset_base(Ra, Rb) | |
let subset1 = subset_base.clone(); | |
// subset(Ra, Rc) :- | |
// subset(Ra, Rb), | |
// subset_base(Rb, Rc). | |
let subset2 = | |
subset.map(|(r_a, r_b)| (r_b, r_a)) | |
.join(&subset_base) | |
.map(|(r_b, r_a, r_c)| (r_a, r_c)); | |
subset1.concat(&subset2).distinct_total() | |
}); | |
// requires(R, L) :- borrow_region(R, L, _P). | |
let requires = borrow_region | |
.map(|(r, l, _p)| (r, l)) | |
.distinct_total(); | |
// requires(R2, L) :- requires(R1, L), subset(R1, R2). | |
let requires = requires.iterate(|requires| { | |
let subset_base = subset_base.enter(&requires.scope()); | |
requires | |
.join(&subset_base) | |
.map(|(r1, l, r2)| (r2, l)) | |
.distinct_total() | |
}); | |
//potential_errors(L, P) :- | |
// invalidated(L, P), | |
// requires(R, L), | |
// region_live_at(R, P). | |
// .decl borrow_live_at(B, P) -- true if the restrictions of the borrow B | |
// need to be enforced at the point P | |
let borrow_live_at = { | |
// borrow_live_at(B, P) :- requires(R, B, P), region_live_at(R, P) | |
let borrow_live_at1 = requires | |
.map(|(r, b, p)| ((r, p), b)) | |
.semijoin(®ion_live_at) | |
.map(|((_r, p), b)| (b, p)); | |
// borrow_live_at(B, P) :- requires(R, B, P), universal_region(R). | |
let borrow_live_at2 = requires | |
.map(|(r, b, p)| (r, (p, b))) | |
.semijoin(&universal_region) | |
.map(|(_r, (p, b))| (b, p)); | |
borrow_live_at1.concat(&borrow_live_at2).distinct() | |
}; | |
if dump_enabled { | |
region_live_at.inspect_batch({ | |
let result = result.clone(); | |
move |_timestamp, facts| { | |
let mut result = result.lock().unwrap(); | |
for ((region, location), _timestamp, multiplicity) in facts { | |
assert_eq!(*multiplicity, 1); | |
result | |
.region_live_at | |
.entry(*location) | |
.or_insert(vec![]) | |
.push(*region); | |
} | |
} | |
}); | |
subset.inspect_batch({ | |
let result = result.clone(); | |
move |_timestamp, facts| { | |
let mut result = result.lock().unwrap(); | |
for ((r1, r2, location), _timestamp, multiplicity) in facts { | |
assert_eq!(*multiplicity, 1); | |
result | |
.subset | |
.entry(*location) | |
.or_insert(BTreeMap::new()) | |
.entry(*r1) | |
.or_insert(BTreeSet::new()) | |
.insert(*r2); | |
result.region_degrees.update_degrees(*r1, *r2, *location); | |
} | |
} | |
}); | |
requires.inspect_batch({ | |
let result = result.clone(); | |
move |_timestamp, facts| { | |
let mut result = result.lock().unwrap(); | |
for ((region, borrow, location), _timestamp, multiplicity) in facts { | |
assert_eq!(*multiplicity, 1); | |
result | |
.restricts | |
.entry(*location) | |
.or_insert(BTreeMap::new()) | |
.entry(*region) | |
.or_insert(BTreeSet::new()) | |
.insert(*borrow); | |
} | |
} | |
}); | |
} | |
borrow_live_at.inspect_batch({ | |
let result = result.clone(); | |
move |_timestamp, facts| { | |
let mut result = result.lock().unwrap(); | |
for ((borrow, location), _timestamp, multiplicity) in facts { | |
assert_eq!(*multiplicity, 1); | |
result | |
.borrow_live_at | |
.entry(*location) | |
.or_insert(Vec::new()) | |
.push(*borrow); | |
} | |
} | |
}); | |
}); | |
} | |
}).unwrap(); | |
// Remove from the Arc and Mutex, since it is fully populated now. | |
Arc::try_unwrap(result) | |
.unwrap_or_else(|_| panic!("somebody still has a handle to this arc")) | |
.into_inner() | |
.unwrap() | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment