Skip to content

Instantly share code, notes, and snippets.

@mrsarm

mrsarm/main.rs Secret

Created March 1, 2024 22:56
Show Gist options
  • Save mrsarm/28ff763ccc710e6a2875c257cb03edee to your computer and use it in GitHub Desktop.
Save mrsarm/28ff763ccc710e6a2875c257cb03edee to your computer and use it in GitHub Desktop.
vector_threads_and_channels
/// Multi-thread example in Rust of iterating a vector as input data
/// using threads in a safe way, using the threads and the channels APIs, and
/// the `Arc` and `Mutex` types.
///
/// ## Details
///
/// Spawn MAX_NTHREADS number of threads, then from each thread borrow
/// the vector to only pop one string from it (the element is taken out
/// of the vector), and process the string element to
/// calculate a new string and publish it into a channel, that
/// can then be listened by a receiver from the main thread, while the
/// children threads keep sending new elements to be processed.
///
/// Finally, print the result from the main thread, after all children
/// threads finished.
use rand::random;
use std::sync::{Arc, Mutex};
use std::sync::mpsc::{Sender, Receiver};
use std::sync::mpsc;
use std::time::Duration;
use std::thread;
static MAX_NTHREADS: usize = 4;
/// Function that pretends to do something useful, getting
/// a new string from input data, and taking some time to do it
fn transformer(tid: usize, element: &str) -> String {
thread::sleep(Duration::from_millis(3 * random::<u8>() as u64));
format!("{}:{}", tid, element)
}
fn main() {
let num_elements = 40; // Elements created inside the input vector for testing
// input vector, inside an Arc<Mutex> to be consumed from the threads,
// at the end of the execution of all threads, the vector will have
// all it's elements consumed (will be empty)
let input = Arc::new(
Mutex::new(
(0..num_elements).step_by(1)
.map(|n| n.to_string()).collect::<Vec<String>>()
)
);
let nthreads = MAX_NTHREADS;
let (tx, rx): (Sender<String>, Receiver<String>) = mpsc::channel();
let mut tchildren = Vec::new();
println!("Spawning {} threads ...", nthreads);
for tid in 0..nthreads {
let input = Arc::clone(&input);
let thread_tx = tx.clone();
let child = thread::spawn(move || {
println!("Running scoped thread {} ...", tid);
loop {
let mut v = input.lock().unwrap();
let last = v.pop(); // take one element out from the vec and free
drop(v); // the vector lock so other threads can get it
if let Some(el) = last {
thread_tx.send(transformer(tid, &el)).unwrap();
} else {
break; // The vector got empty
}
}
});
tchildren.push(child);
}
println!("... spawning ended.");
// Here, all the messages are collected
let mut output: Vec<String> = Vec::with_capacity(num_elements);
for _ in 0..num_elements {
// The `recv` method picks a message from the channel
// `recv` will block the current thread if there are no messages available
let out = rx.recv().unwrap();
println!(">> {out}");
output.push(out);
}
// Wait for the threads to complete any remaining work
for child in tchildren {
child.join().expect("oops! the child thread panicked");
}
dbg!(output);
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment