This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
fn merge(merged_result_chan: Sender<PipelineMsg>) -> Sender<PipelineMsg> { | |
let (chan, port) = channel(); | |
let _ = thread::Builder::new().spawn(move || { | |
// Iteration will stop when all senders are dropped. | |
// This happens when the worker stop their iteration, | |
// and drop their "merge_chan"... | |
for msg in port { | |
let squared = match msg { | |
PipelineMsg::Squared(num) => num, | |
_ => panic!("unexpected message receiving at merge stage"), |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
fn main() { | |
let (results_chan, results_port) = channel(); | |
let (gen_chan, gen_port) = channel(); | |
let merge_chan = merge(results_chan); | |
{ | |
// New scope starts here. | |
let mut square_workers: VecDeque<Sender<PipelineMsg>> = vec![square(merge_chan.clone()), | |
square(merge_chan)] | |
.into_iter() | |
.collect(); |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// Results chan, to get the final merged results to the main thread | |
let (results_chan, results_port) = channel(); | |
// Gen chan, to get generated numbers from the "generator". | |
let (gen_chan, gen_port) = channel(); | |
// Share the sender of the results chan to the merge component. | |
let merge_chan = merge(results_chan); | |
// Start the workers, each worker receives a clone of the "merge chan sender". | |
let mut square_workers: VecDeque<Sender<PipelineMsg>> = vec![square(merge_chan.clone()), | |
square(merge_chan.clone())] | |
.into_iter() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
for msg in gen_port { | |
// Receive generated numbers from the "generate" stage. | |
let generated_num = match msg { | |
PipelineMsg::Generated(num) => num, | |
_ => panic!("unexpected message receiving from gen stage"), | |
}; | |
// Cycle through the workers and distribute work. | |
let worker = square_workers.pop_front().unwrap(); | |
let _ = worker.send(msg); | |
square_workers.push_back(worker); |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
for result in results_port { | |
// Receive "merged results" from the "merge" stage. | |
match result { | |
PipelineMsg::Merged(_) => continue, | |
_ => panic!("unexpected result"), | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
{ | |
// New scope | |
let mut square_workers: VecDeque<Sender<PipelineMsg>> = vec![square(merge_chan.clone()), | |
square(merge_chan.clone())] | |
.into_iter() | |
.collect(); | |
generate(gen_chan); | |
// When we drop the gen_port, generate will quit. | |
for msg in gen_port { | |
let generated_num = match msg { |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// Cloning merge_chan first, then moving it. | |
// This move ensures that when square_workers drop, | |
// and the workers quit their loop, | |
// all merge_chan senders drop as well, | |
// triggerign the quit of the loop in the "merge" stage... | |
let mut square_workers: VecDeque<Sender<PipelineMsg>> = vec![square(merge_chan.clone()), | |
square(merge_chan)] | |
.into_iter() | |
.collect(); |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// The channel to receive results from the consumer. | |
let (results_sender, results_receiver) = channel(); | |
// The channel to receive work from the producer. | |
let (work_sender, work_receiver) = channel(); | |
// A few "global" settings. | |
let number_of_workflows = 5; | |
let number_of_steps = 4; | |
// Start the consumer, passing along the sender of the "resuts channel". | |
// Note the return value is a sender for the "consumer channel". | |
let consumer_sender = start_consumer(results_sender, number_of_workflows, number_of_steps); |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// Note select is unstable, | |
// consider using crossbeam-channel instead... | |
#![feature(mpsc_select)] | |
loop { | |
let msg = { | |
let sel = Select::new(); | |
// Selecting over our "work receiver", | |
// corresponding to the sender shared with the "producer" | |
// And our "results receiver", corresponding |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// Handle the msg from the select. | |
let result = match msg { | |
// We got work from the producer... | |
MainMsg::FromProducer(ProducerMsg::Incoming(workflow)) => { | |
// Send info to the consumer, telling it to expect this piece of work | |
// This is really just for testing in this case... | |
let _ = consumer_sender.send(ConsumerMsg::ExpectWorkflow(workflow.id)); | |
if let Some(executor) = executor_queue.pop_front() { | |
// Send work to the worker at the front of the qeue. | |
let _ = executor.send(ExecutorMsg::Execute(workflow)); |