Skip to content

Instantly share code, notes, and snippets.

@jesperdj
Created August 6, 2020 16:10
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save jesperdj/9ecf6ade50b15d27efd8ff0ceb2205a0 to your computer and use it in GitHub Desktop.
Save jesperdj/9ecf6ade50b15d27efd8ff0ceb2205a0 to your computer and use it in GitHub Desktop.
use crossbeam_channel;
use crossbeam_utils::thread;
/// Multi-threaded processing using Crossbeam.
///
/// This function takes a generator which is an `Iterator` over values of type `V`, a processor function that processes values of type `V` into results of
/// type `R`, and an aggregator function which accepts results of type `R`.
///
/// It starts one generator thread and a number of processor threads. The generator thread gets values from the generator and sends them to an input channel.
/// The processor threads receive from the input channel, call the processor function to compute a result for each value and send the results to an output
/// channel. The main thread receives from the output channel and aggregates the results using the aggregator function.
fn process<V, R, G, P, A>(mut generate: G, process: &P, aggregate: &mut A) -> std::thread::Result<()>
where
V: Sync + Send,
R: Sync + Send,
G: Iterator<Item=V> + Sync + Send,
P: Fn(V) -> R + Sync + Send,
A: FnMut(R),
{
const PROCESSOR_COUNT: usize = 4;
const INPUT_CHANNEL_CAPACITY: usize = 1000;
const OUTPUT_CHANNEL_CAPACITY: usize = 1000;
let (input_snd, input_rcv) = crossbeam_channel::bounded(INPUT_CHANNEL_CAPACITY);
let (output_snd, output_rcv) = crossbeam_channel::bounded(OUTPUT_CHANNEL_CAPACITY);
thread::scope(|scope| {
{
let generator_snd = input_snd.clone();
scope.spawn(move |_| {
println!("generator started");
while let Some(value) = generate.next() {
generator_snd.send(value).unwrap();
}
println!("generator finished");
});
}
for id in 1..=PROCESSOR_COUNT {
let processor_rcv = input_rcv.clone();
let processor_snd = output_snd.clone();
scope.spawn(move |_| {
println!("thread {} started", id);
for value in processor_rcv {
let result = process(value);
processor_snd.send(result).unwrap();
}
println!("thread {} finished", id);
});
}
drop(input_snd);
drop(input_rcv);
drop(output_snd);
println!("aggregator started");
for result in output_rcv {
aggregate(result);
}
println!("aggregator finished");
})
}
fn main() {
let xs = vec![1, 5, 34, -2, 6, 10];
let prc = |x| x * 2 + 1;
let mut total = 0;
let mut agg = |x| total += x;
process(xs.iter(), &prc, &mut agg).unwrap();
println!("total = {}", total);
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment