Skip to content

Instantly share code, notes, and snippets.

@hinzundcode
Created February 5, 2020 18:31
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save hinzundcode/c21b93cfd8eb8b28a1968ed564f71cdb to your computer and use it in GitHub Desktop.
Save hinzundcode/c21b93cfd8eb8b28a1968ed564f71cdb to your computer and use it in GitHub Desktop.
#![allow(dead_code)]
#![allow(unused_variables)]
#![allow(unused_assignments)]
#![allow(unused_mut)]
#![allow(unused_imports)]
#![allow(unused_macros)]
#![allow(unreachable_code)]
use futures::lock::Mutex;
use futures::future::select_all;
use futures::prelude::*;
use futures::executor::block_on;
use futures::channel::oneshot;
use futures::channel::mpsc;
use std::thread;
use std::time;
#[derive(Debug)]
struct Request(String);
#[derive(Debug)]
struct Response(String);
async fn worker_thread(name: String, delay: u64, mut request_rx: mpsc::Receiver<(Request,oneshot::Sender<Response>)>) {
while let Some((request, response_tx)) = request_rx.next().await {
thread::sleep(time::Duration::from_secs(delay));
let response = Response(request.0.to_ascii_uppercase() + " from " + &name);
response_tx.send(response).unwrap();
}
}
struct Worker {
request_tx: Mutex<mpsc::Sender<(Request,oneshot::Sender<Response>)>>,
}
// Box<dyn Future<Output = _>>
// Pin<Box<dyn Future>>
// Pin<Box<dyn Future<Output = _>>>
// futures::future::BoxFuture
// futures::future::LocalBoxFuture
// future.boxed() / future.boxed_local()
// https://docs.rs/futures/0.3.2/futures/task/struct.AtomicWaker.html
impl Worker {
fn new(name: String, delay: u64) -> Self {
let (request_tx, request_rx) = mpsc::channel(1);
thread::spawn(move || {
block_on(worker_thread(name, delay, request_rx));
});
Worker {
request_tx: Mutex::new(request_tx),
}
}
async fn request(&self, string: String) -> String {
let request = Request(string);
let (tx, rx) = oneshot::channel();
let mut request_tx = self.request_tx.lock().await;
request_tx.send((request, tx)).await.unwrap();
rx.await.unwrap().0
}
}
async fn test_main(workers: Vec<Worker>) {
let mut jobs = vec![
workers[0].request("Hallo a".to_string()).boxed_local(),
workers[0].request("Hallo b".to_string()).boxed_local(),
workers[1].request("Hallo c".to_string()).boxed_local(),
workers[1].request("Hallo d".to_string()).boxed_local(),
];
loop {
let (result, _, new_jobs) = select_all(jobs).await;
dbg!(result);
jobs = new_jobs;
if jobs.len() == 0 {
break;
}
}
}
async fn async_main() {
let workers = vec![
Worker::new("Worker 1".to_string(), 6),
Worker::new("Worker 2".to_string(), 2),
];
test_main(workers).await;
//futures::future::join3(test_main(&workers), test_main(&workers), test_main(&workers)).await;
}
fn main() {
block_on(async_main());
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment