Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save Nejat/696b275813e6f2c331a0a3e631613a1c to your computer and use it in GitHub Desktop.
Save Nejat/696b275813e6f2c331a0a3e631613a1c to your computer and use it in GitHub Desktop.
// in cargo.toml
// rand = "0.7"
// parking_lot = "0.11"
// crossbeam-channel = "0.5"
use std::{thread::{sleep, spawn}, time::Duration};
use crossbeam_channel::{RecvTimeoutError, Sender, unbounded};
use rand::{Rng, thread_rng};
const MESSAGES: u8 = 128;
const MESSAGE_GEN: usize = 256;
const DELAY_MAX: u64 = 5;
#[derive(Debug)]
enum EventA {
MsgOne,
MsgTwo,
MsgThree,
}
#[derive(Debug)]
enum EventB {
MsgOne,
MsgTwo,
MsgThree,
}
#[derive(Debug)]
enum EventC {
MsgOne,
MsgTwo,
MsgThree,
}
fn main() {
let (a_tx, a_rx) = unbounded();
let (b_tx, b_rx) = unbounded();
let (c_tx, c_rx) = unbounded();
generate_a(a_tx);
generate_b(b_tx);
generate_c(c_tx);
spawn(move || {
const RECEIVE_TIMEOUT: u64 = 5;
let mut done = 0u8;
loop {
match a_rx.recv_timeout(Duration::from_millis(RECEIVE_TIMEOUT)) {
Ok(event) => println!("A: {:?}", event),
Err(RecvTimeoutError::Timeout) => {}
Err(RecvTimeoutError::Disconnected) => done += 1
}
match b_rx.recv_timeout(Duration::from_millis(RECEIVE_TIMEOUT)) {
Ok(event) => println!("B: {:?}", event),
Err(RecvTimeoutError::Timeout) => {}
Err(RecvTimeoutError::Disconnected) => done += 1
}
match c_rx.recv_timeout(Duration::from_millis(RECEIVE_TIMEOUT)) {
Ok(event) => println!("C: {:?}", event),
Err(RecvTimeoutError::Timeout) => {}
Err(RecvTimeoutError::Disconnected) => done += 1
}
if done >= 3 {
break;
}
}
})
.join()
.unwrap();
}
fn generate_a(tx: Sender<EventA>) {
spawn(move || {
let mut rng = thread_rng();
let mut counter = 0u8;
loop {
counter += 1;
if counter > MESSAGES {
println!("A: Done");
break;
}
sleep(Duration::from_millis(rng.gen_range(0, DELAY_MAX)));
tx.send(
match rng.gen_range(0, MESSAGE_GEN) % 3 {
0 => EventA::MsgOne,
1 => EventA::MsgTwo,
2 => EventA::MsgThree,
_ => unreachable!()
}
).unwrap()
}
});
}
fn generate_b(tx: Sender<EventB>) {
spawn(move || {
let mut rng = thread_rng();
let mut counter = 0u8;
loop {
counter += 1;
if counter > MESSAGES {
println!("B: Done");
break;
}
sleep(Duration::from_millis(rng.gen_range(0, DELAY_MAX)));
tx.send(
match rng.gen_range(0, MESSAGE_GEN) % 3 {
0 => EventB::MsgOne,
1 => EventB::MsgTwo,
2 => EventB::MsgThree,
_ => unreachable!()
}
).unwrap()
}
});
}
fn generate_c(tx: Sender<EventC>) {
spawn(move || {
let mut rng = thread_rng();
let mut counter = 0u8;
loop {
counter += 1;
if counter > MESSAGES {
println!("C: Done");
break;
}
sleep(Duration::from_millis(rng.gen_range(0, DELAY_MAX)));
tx.send(
match rng.gen_range(0, MESSAGE_GEN) % 3 {
0 => EventC::MsgOne,
1 => EventC::MsgTwo,
2 => EventC::MsgThree,
_ => unreachable!()
}
).unwrap()
}
});
}
@Nejat
Copy link
Author

Nejat commented Oct 16, 2020

this example works with std::sync::mpsc::channel as well

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment