Skip to content

Instantly share code, notes, and snippets.

@li1
Created August 19, 2019 15:55
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save li1/9af1988664a4f3ad4492d4993b779eef to your computer and use it in GitHub Desktop.
Save li1/9af1988664a4f3ad4492d4993b779eef to your computer and use it in GitHub Desktop.
ST2 broadcast join
use std::time::Duration;
use std::sync::{Mutex, Arc};
use std::path::PathBuf;
use tdiag_connect::receive as connect;
use tdiag_connect::receive::ReplaySource;
use timely::dataflow::operators::capture::replay::Replay;
use timely::dataflow::operators::inspect::Inspect;
use timely::logging::TimelyEvent;
use timely::dataflow::Stream;
use timely::logging::WorkerIdentifier;
use timely::dataflow::operators::delay::Delay;
use timely::dataflow::operators::map::Map;
use timely::logging::TimelyEvent::{Messages, Progress, Schedule, Operates};
use timely::dataflow::operators::filter::Filter;
use timely::dataflow::operators::exchange::Exchange;
use timely::dataflow::channels::pact::Pipeline;
use differential_dataflow::operators::arrange::arrangement::Arrange;
use differential_dataflow::collection::AsCollection;
use differential_dataflow::operators::join::Join;
use differential_dataflow::operators::reduce::Threshold;
use differential_dataflow::operators::join::JoinCore;
use differential_dataflow::operators::arrange::Arranged;
use differential_dataflow::trace::implementations::ord::OrdValSpine;
use differential_dataflow::operators::arrange::TraceAgent;
fn main() {
let source_peers: usize = std::env::args().nth(1).unwrap().parse().unwrap();
let path: Option<String> = None;
let replay_source = make_replay_source(source_peers, path);
timely::execute_from_args(std::env::args(), move |worker| {
// read replayers from file (offline) or TCP stream (online)
let readers = connect::make_readers(replay_source.clone(), worker.index(), worker.peers()).expect("couldn't create readers");
let peers = worker.peers();
worker.dataflow::<Duration, _, _>(move |scope| {
let stream: Stream<_, (Duration, WorkerIdentifier, TimelyEvent)> = readers.replay_into(scope);
let operates = stream
.filter(|(_, w, _)| *w== 0)
.flat_map(|(t, _, x)| if let Operates(event) = x { Some((event, t, 1 as isize)) } else { None })
.as_collection()
.map(|event| (event.addr, (event.id, event.name)));
let scopes = operates.map(|(mut addr, _)| {
addr.pop();
addr
}).distinct();
// broadcast join
let inner = operates
.antijoin(&scopes)
.map(|(addr, (id, name))| id)
.inspect(|x| println!("{:?}", x))
.flat_map(move |id| (0 .. peers).map(move |wid| (wid, id)))
.inner
.exchange(|((wid, id), t, diff)| *wid as u64)
.map(|((wid, id), t, diff)| ((id, ()), t, diff))
.as_collection();
let inner_arranged: Arranged<_, TraceAgent<OrdValSpine<usize, _, Duration, isize>>> = inner.arrange_core(Pipeline, "inner");
let outer = operates
.semijoin(&scopes)
.map(|(addr, (id, name))| id)
.inspect(|x| println!("{:?}", x))
.flat_map(move |id| (0 .. peers).map(move |wid| (wid, id)))
.inner
.exchange(|((wid, id), t, diff)| *wid as u64)
.map(|((wid, id), t, diff)| ((id, ()), t, diff))
.as_collection();
let outer_arranged: Arranged<_, TraceAgent<OrdValSpine<usize, _, Duration, isize>>> = outer.arrange_core(Pipeline, "outer");
let schedules: Arranged<_, TraceAgent<OrdValSpine<usize, _, Duration, isize>>> = stream
.flat_map(|(t, w, x)| if let Schedule(_) = &x { Some(((t, w, x), t, 1 as isize)) } else { None })
.as_collection()
.map(|(t, w, x)| if let Schedule(s) = &x { (s.id, (t, w, x)) } else { unreachable!() })
.arrange_core(Pipeline, "schedules");
schedules
.join_core(&outer_arranged, |_, a, _| Some(a.clone()))
.inspect(|(t, w, x)| println!("{:?}, {:?}, {:?}", t, w, x));
// let stripped = windowed_stream
// .map(|(t, wid, x)| {
// let key = match &x {
// Operates(_) => (wid, None),
// Progress(_) => (wid, None),
// Messages(_) => (wid, None),
// Schedule(s) => (wid, Some(s.id)),
// _ => unreachable!()
// };
// (key, (t, wid, x))
// })
// .antijoin(&peel_ids)
// .map(|(_key, x)| x)
// .inspect(|x| println!("{:?}", x));
});
}).unwrap();
}
fn make_replay_source(source_peers: usize, path: Option<String>) -> ReplaySource {
if let Some(path) = path {
println!("Reading from {} *.dump files", source_peers);
let files = (0 .. source_peers)
.map(|idx| format!("{}/{}.dump", path, idx))
.map(|path| Some(PathBuf::from(path)))
.collect::<Vec<_>>();
ReplaySource::Files(Arc::new(Mutex::new(files)))
} else {
let ip_addr: std::net::IpAddr = "127.0.0.1".parse().expect("couldn't parse IP");
let port: u16 = 1234;
println!("Listening for {} connections on {}:{}", source_peers, ip_addr, port);
let sockets = connect::open_sockets(ip_addr, port, source_peers).expect("couldn't open sockets");
ReplaySource::Tcp(Arc::new(Mutex::new(sockets)))
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment