Skip to content

Instantly share code, notes, and snippets.

@mrsarm
Last active March 1, 2024 22:58
Show Gist options
  • Save mrsarm/0df4c474b6f9c2008ff9e421c5855df5 to your computer and use it in GitHub Desktop.
Save mrsarm/0df4c474b6f9c2008ff9e421c5855df5 to your computer and use it in GitHub Desktop.
vector_threads
/// Multi-thread example in Rust of iterating a vector as input data in chunks,
/// using threads in a safe way, using the threads and the channels APIs, and
/// the `Arc` type.
///
/// ## Details
///
/// Spawn MAX_NTHREADS number of threads, then from each thread borrow a slice
/// of the vector data, and from each string inside
/// the slice, calculate a new string and publish it into a channel, that
/// can then be listened by a receiver from the main thread, while the
/// children threads keep sending new elements to be processed.
///
/// Finally, print the result from the main thread, after all children
/// threads finished.
use rand::random;
use std::sync::Arc;
use std::sync::mpsc::{Sender, Receiver};
use std::sync::mpsc;
use std::time::Duration;
use std::thread;
static MAX_NTHREADS: usize = 4;
/// Function that pretends to do something useful, getting
/// a new string from input data, and taking some time to do it
fn transformer(tid: usize, element: &str) -> String {
thread::sleep(Duration::from_millis(3 * random::<u8>() as u64));
format!("{}:{}", tid, element)
}
fn main() {
// read-only vector, only with Arc is enough to share it across unknown number of threads
let input = Arc::new((0..40).step_by(1).map(|n| n.to_string()).collect::<Vec<String>>());
let nthreads = MAX_NTHREADS;
let chunk_size = input.len() / nthreads;
let (tx, rx): (Sender<String>, Receiver<String>) = mpsc::channel();
let mut tchildren = Vec::new();
println!("Spawning {} threads ...", nthreads);
for tid in 0..nthreads {
let input = Arc::clone(&input);
let thread_tx = tx.clone();
let child = thread::spawn(move || {
println!("Running scoped thread {} ...", tid);
let slice_start = tid * chunk_size;
let slice_end = slice_start + chunk_size;
for e in &input[slice_start..slice_end] {
thread_tx.send(transformer(tid, e)).unwrap();
}
});
tchildren.push(child);
}
println!("... spawning ended.");
// Here, all the messages are collected
let mut output: Vec<String> = Vec::with_capacity(input.len());
for _ in 0..input.len() {
// The `recv` method picks a message from the channel
// `recv` will block the current thread if there are no messages available
let out = rx.recv().unwrap();
println!(">> {out}");
output.push(out);
}
// Wait for the threads to complete any remaining work
for child in tchildren {
child.join().expect("oops! the child thread panicked");
}
dbg!(output);
}
@mrsarm
Copy link
Author

mrsarm commented Mar 1, 2024

A more efficient implementation that takes elements out of the input vector one by one but from multiple threads is here: https://gist.github.com/mrsarm/28ff763ccc710e6a2875c257cb03edee

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment