Instantly share code, notes, and snippets.

@arya1x /main.rs Secret
Created Nov 19, 2018

Embed
What would you like to do?
How to abstract these actors/workers?
#![feature(tool_lints, async_await, await_macro, futures_api)]
#![allow(unused)]
use futures::future::Future;
use futures::compat::TokioDefaultSpawner;
use futures::future::{FutureExt, TryFutureExt};
use futures::stream::StreamExt;
use tokio;
use tokio::await;
use futures::channel::mpsc;
use futures::channel::oneshot;
use failure::Error;
pub enum Message { // assume eventually each worker has its own seperate message and response type
RequestMeaningOfLife,
RequestStockPrice,
RequestExecuteStockTrade,
Response(u64),
}
pub struct StockWorker {
incoming: mpsc::UnboundedReceiver<(Message, oneshot::Sender<Result<Message, Error>>)>,
pub my_addr: mpsc::UnboundedSender<(Message, oneshot::Sender<Result<Message, Error>>)>,
max_seen_price: u64,
}
impl StockWorker {
pub fn new() -> StockWorker {
let (tx, rx) = mpsc::unbounded();
StockWorker {
incoming: rx,
my_addr: tx,
max_seen_price: 0,
}
}
pub async fn run(mut self) {
loop {
let (msg, ret_addr) = await!(self.incoming.next()).unwrap();
let ret_msg = await!(self.handle(msg));
ret_addr.send(ret_msg);
}
}
async fn handle(&mut self, msg: Message) -> Result<Message, Error> {
match msg {
Message::RequestStockPrice => {
let p = 100; // let p = await!(get_stock_price_from_net());
if p > self.max_seen_price {
self.max_seen_price = p;
}
Ok(Message::Response(p))
},
Message::RequestExecuteStockTrade => {
let profit = 6; // let profit = await!(do_the_trade_from_net());
Ok(Message::Response(profit))
},
_ => panic!(),
}
}
}
pub struct MeaningWorker {
incoming: mpsc::UnboundedReceiver<(Message, oneshot::Sender<Result<Message, Error>>)>,
pub my_addr: mpsc::UnboundedSender<(Message, oneshot::Sender<Result<Message, Error>>)>,
}
impl MeaningWorker {
pub fn new() -> MeaningWorker {
let (tx, rx) = mpsc::unbounded();
MeaningWorker {
incoming: rx,
my_addr: tx,
}
}
pub async fn run(mut self) {
loop {
let (msg, ret_addr) = await!(self.incoming.next()).unwrap();
let ret_msg = await!(self.handle(msg));
ret_addr.send(ret_msg);
}
}
async fn handle(&mut self, msg: Message) -> Result<Message, Error> {
match msg {
Message::RequestMeaningOfLife => {
let x = 42; // let p = await!(get_meaning_of_life_from_net());
Ok(Message::Response(x))
},
_ => panic!(),
}
}
}
async fn async_main() -> Result<(), ()> {
let mut stock_worker = StockWorker::new();
let mut meaning_worker = MeaningWorker::new();
tokio::spawn(stock_worker.run().unit_error().boxed().compat());
tokio::spawn(meaning_worker.run().unit_error().boxed().compat());
stock_worker.my_addr.send(...);
meaning_worker.my_addr.send(...);
Ok(())
}
fn main() {
tokio::run(async_main().boxed().compat());
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment