Skip to content

Instantly share code, notes, and snippets.

View gterzian's full-sized avatar

Gregory Terzian gterzian

View GitHub Profile
fn merge(merged_result_chan: Sender<PipelineMsg>) -> Sender<PipelineMsg> {
let (chan, port) = channel();
let _ = thread::Builder::new().spawn(move || {
// Iteration will stop when all senders are dropped.
// This happens when the worker stop their iteration,
// and drop their "merge_chan"...
for msg in port {
let squared = match msg {
PipelineMsg::Squared(num) => num,
_ => panic!("unexpected message receiving at merge stage"),
fn main() {
let (results_chan, results_port) = channel();
let (gen_chan, gen_port) = channel();
let merge_chan = merge(results_chan);
{
// New scope starts here.
let mut square_workers: VecDeque<Sender<PipelineMsg>> = vec![square(merge_chan.clone()),
square(merge_chan)]
.into_iter()
.collect();
// Results chan, to get the final merged results to the main thread
let (results_chan, results_port) = channel();
// Gen chan, to get generated numbers from the "generator".
let (gen_chan, gen_port) = channel();
// Share the sender of the results chan to the merge component.
let merge_chan = merge(results_chan);
// Start the workers, each worker receives a clone of the "merge chan sender".
let mut square_workers: VecDeque<Sender<PipelineMsg>> = vec![square(merge_chan.clone()),
square(merge_chan.clone())]
.into_iter()
for msg in gen_port {
// Receive generated numbers from the "generate" stage.
let generated_num = match msg {
PipelineMsg::Generated(num) => num,
_ => panic!("unexpected message receiving from gen stage"),
};
// Cycle through the workers and distribute work.
let worker = square_workers.pop_front().unwrap();
let _ = worker.send(msg);
square_workers.push_back(worker);
for result in results_port {
// Receive "merged results" from the "merge" stage.
match result {
PipelineMsg::Merged(_) => continue,
_ => panic!("unexpected result"),
}
}
{
// New scope
let mut square_workers: VecDeque<Sender<PipelineMsg>> = vec![square(merge_chan.clone()),
square(merge_chan.clone())]
.into_iter()
.collect();
generate(gen_chan);
// When we drop the gen_port, generate will quit.
for msg in gen_port {
let generated_num = match msg {
// Cloning merge_chan first, then moving it.
// This move ensures that when square_workers drop,
// and the workers quit their loop,
// all merge_chan senders drop as well,
// triggerign the quit of the loop in the "merge" stage...
let mut square_workers: VecDeque<Sender<PipelineMsg>> = vec![square(merge_chan.clone()),
square(merge_chan)]
.into_iter()
.collect();
// The channel to receive results from the consumer.
let (results_sender, results_receiver) = channel();
// The channel to receive work from the producer.
let (work_sender, work_receiver) = channel();
// A few "global" settings.
let number_of_workflows = 5;
let number_of_steps = 4;
// Start the consumer, passing along the sender of the "resuts channel".
// Note the return value is a sender for the "consumer channel".
let consumer_sender = start_consumer(results_sender, number_of_workflows, number_of_steps);
// Note select is unstable,
// consider using crossbeam-channel instead...
#![feature(mpsc_select)]
loop {
let msg = {
let sel = Select::new();
// Selecting over our "work receiver",
// corresponding to the sender shared with the "producer"
// And our "results receiver", corresponding
// Handle the msg from the select.
let result = match msg {
// We got work from the producer...
MainMsg::FromProducer(ProducerMsg::Incoming(workflow)) => {
// Send info to the consumer, telling it to expect this piece of work
// This is really just for testing in this case...
let _ = consumer_sender.send(ConsumerMsg::ExpectWorkflow(workflow.id));
if let Some(executor) = executor_queue.pop_front() {
// Send work to the worker at the front of the qeue.
let _ = executor.send(ExecutorMsg::Execute(workflow));