Skip to content

Instantly share code, notes, and snippets.

@zeddee
Last active August 8, 2023 23:18
Show Gist options
  • Save zeddee/ab9a270dd8669d8b83e37f644e7d258f to your computer and use it in GitHub Desktop.
Save zeddee/ab9a270dd8669d8b83e37f644e7d258f to your computer and use it in GitHub Desktop.
use rand::Rng;
use std::collections::HashMap;
use std::ops::Range;
use std::sync::mpsc;
use std::thread;
use std::time::Duration;
fn sleep(range: Range<usize>) {
thread::sleep(Duration::from_millis(
rand::thread_rng().gen_range(range) as u64
));
}
fn kick_off(label: char, data: Vec<char>, tx: mpsc::Sender<char>) -> thread::JoinHandle<()> {
thread::spawn(move || {
// not sure what move does here
for i in data {
match tx.send(i) {
Ok(_) => {
println!("thread {label}: sending {:?}", i);
}
Err(e) => println!("Err: {:?}", e),
}
sleep(0..10);
}
})
}
fn kick_receiver(label: char, rx: mpsc::Receiver<char>) -> thread::JoinHandle<Vec<char>> {
thread::spawn(move || {
let max_retries = 10;
let mut retry_counter = 0;
let mut received: Vec<char> = Vec::new();
loop {
match rx.try_recv() {
// Use try_recv so that each recv() doesn't unintentionally block the thread
// from progressing if it ends up stuck on an empty response.
Ok(x) if x == 'πŸ‘' => {
// Improvised SIGTERM
println!("Received termination signal: {x}");
break;
}
Ok(x) => {
retry_counter = 0; // reset retries
println!("{label} received: {x}");
received.push(x);
}
Err(e) if e == mpsc::TryRecvError::Empty => {
if retry_counter < max_retries {
retry_counter += 1;
println!("Retries: {retry_counter}");
sleep(4..5);
continue;
} else {
println!("HIT MAX RETRIES {max_retries}");
break;
}
}
Err(e) => {
println!("Receiver error: {:?}", e);
}
}
}
println!("COLLECTING");
received
})
}
fn main() {
let mut txers: HashMap<char, thread::JoinHandle<()>> = HashMap::new();
let mut rxers: HashMap<char, thread::JoinHandle<Vec<char>>> = HashMap::new();
let (tx, rx) = mpsc::channel();
let stack = vec![
'πŸ’©', '🚽', 'πŸ’€', 'πŸͺ“', '🦨', 'πŸ„', 'πŸ’₯', '🚧', 'πŸ”ͺ', 'πŸ’£', '🧨',
];
// Kick off tx and rx threads
txers
.entry('πŸ’©')
.or_insert(kick_off('πŸ’©', stack.clone(), tx.clone()));
txers
.entry('πŸ™ƒ')
.or_insert(kick_off('πŸ™ƒ', stack.clone(), tx.clone()));
rxers.entry('πŸ›’').or_insert(kick_receiver('πŸ›’', rx));
for i in 1..5 {
// this will occur in sequence -- is synchronous
println!("hello! i am {i} from the MAIN thread");
sleep(0..10);
}
for (label, handle) in txers {
if let Some(e) = handle.join().err() {
println!("{label} Err: {:?}", e);
}
}
kick_off('πŸ‘', vec!['πŸ‘'], tx.clone()); // improvised SIGTERM
let mut results = HashMap::new();
for (label, handle) in rxers {
let res = handle.join().expect("Error on {label}");
results.entry(label).or_insert(res);
}
for result in results {
println!("RESULTS: {:?}", result);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment