Skip to content

Instantly share code, notes, and snippets.

@junha1
Created April 3, 2020 10:03
Show Gist options
  • Save junha1/228b6ab5c85ba76901df19d441b5fe37 to your computer and use it in GitHub Desktop.
Save junha1/228b6ab5c85ba76901df19d441b5fe37 to your computer and use it in GitHub Desktop.
Service handler using Rust async
use async_std::sync::{channel, Receiver};
use futures::future::FutureExt;
use rand::rngs::SmallRng;
use rand::FromEntropy;
use rand::Rng;
use std::thread;
use std::time;
use tokio::time::delay_for;
#[derive(PartialEq, Clone, Debug)]
enum HaveToWait {
Complete(String),
NewRequest(usize),
}
async fn do_something2(x: usize) -> String {
let mut rng = SmallRng::from_entropy();
delay_for(time::Duration::from_millis(rng.gen_range(10, 50))).await;
if x == 999 {
"".to_owned()
} else {
format!("Wow, I consumed {}", x)
}
}
fn smart_consume(recv: Receiver<usize>) {
let mut pool = Vec::new();
let mut rt = tokio::runtime::Runtime::new().unwrap();
let mut popped = true;
loop {
if popped || pool.is_empty() {
pool.push(
recv.recv()
.map(|x| HaveToWait::NewRequest(x.unwrap()))
.boxed(),
);
popped = false;
}
let (r, _, mut new_pool) = rt.block_on(futures::future::select_all(pool));
match r {
HaveToWait::Complete(msg) => {
println!("{:?}", msg);
if msg.is_empty() {
return;
}
}
HaveToWait::NewRequest(arg) => {
// new request
new_pool.push(do_something2(arg).map(|x| HaveToWait::Complete(x)).boxed());
popped = true;
}
}
pool = new_pool;
}
}
pub fn complex() {
let mut rt = tokio::runtime::Runtime::new().unwrap();
let (send, recv) = channel(1000);
let t = thread::spawn(move || smart_consume(recv));
let mut rng = SmallRng::from_entropy();
for i in 0..1000 {
rt.block_on(send.send(i));
thread::sleep(time::Duration::from_millis(rng.gen_range(1, 10)));
}
t.join().unwrap();
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment