How to abstract these actors/workers?
| #![feature(tool_lints, async_await, await_macro, futures_api)] | |
| #![allow(unused)] | |
| use futures::future::Future; | |
| use futures::compat::TokioDefaultSpawner; | |
| use futures::future::{FutureExt, TryFutureExt}; | |
| use futures::stream::StreamExt; | |
| use tokio; | |
| use tokio::await; | |
| use futures::channel::mpsc; | |
| use futures::channel::oneshot; | |
| use failure::Error; | |
| pub enum Message { // assume eventually each worker has its own seperate message and response type | |
| RequestMeaningOfLife, | |
| RequestStockPrice, | |
| RequestExecuteStockTrade, | |
| Response(u64), | |
| } | |
| pub struct StockWorker { | |
| incoming: mpsc::UnboundedReceiver<(Message, oneshot::Sender<Result<Message, Error>>)>, | |
| pub my_addr: mpsc::UnboundedSender<(Message, oneshot::Sender<Result<Message, Error>>)>, | |
| max_seen_price: u64, | |
| } | |
| impl StockWorker { | |
| pub fn new() -> StockWorker { | |
| let (tx, rx) = mpsc::unbounded(); | |
| StockWorker { | |
| incoming: rx, | |
| my_addr: tx, | |
| max_seen_price: 0, | |
| } | |
| } | |
| pub async fn run(mut self) { | |
| loop { | |
| let (msg, ret_addr) = await!(self.incoming.next()).unwrap(); | |
| let ret_msg = await!(self.handle(msg)); | |
| ret_addr.send(ret_msg); | |
| } | |
| } | |
| async fn handle(&mut self, msg: Message) -> Result<Message, Error> { | |
| match msg { | |
| Message::RequestStockPrice => { | |
| let p = 100; // let p = await!(get_stock_price_from_net()); | |
| if p > self.max_seen_price { | |
| self.max_seen_price = p; | |
| } | |
| Ok(Message::Response(p)) | |
| }, | |
| Message::RequestExecuteStockTrade => { | |
| let profit = 6; // let profit = await!(do_the_trade_from_net()); | |
| Ok(Message::Response(profit)) | |
| }, | |
| _ => panic!(), | |
| } | |
| } | |
| } | |
| pub struct MeaningWorker { | |
| incoming: mpsc::UnboundedReceiver<(Message, oneshot::Sender<Result<Message, Error>>)>, | |
| pub my_addr: mpsc::UnboundedSender<(Message, oneshot::Sender<Result<Message, Error>>)>, | |
| } | |
| impl MeaningWorker { | |
| pub fn new() -> MeaningWorker { | |
| let (tx, rx) = mpsc::unbounded(); | |
| MeaningWorker { | |
| incoming: rx, | |
| my_addr: tx, | |
| } | |
| } | |
| pub async fn run(mut self) { | |
| loop { | |
| let (msg, ret_addr) = await!(self.incoming.next()).unwrap(); | |
| let ret_msg = await!(self.handle(msg)); | |
| ret_addr.send(ret_msg); | |
| } | |
| } | |
| async fn handle(&mut self, msg: Message) -> Result<Message, Error> { | |
| match msg { | |
| Message::RequestMeaningOfLife => { | |
| let x = 42; // let p = await!(get_meaning_of_life_from_net()); | |
| Ok(Message::Response(x)) | |
| }, | |
| _ => panic!(), | |
| } | |
| } | |
| } | |
| async fn async_main() -> Result<(), ()> { | |
| let mut stock_worker = StockWorker::new(); | |
| let mut meaning_worker = MeaningWorker::new(); | |
| tokio::spawn(stock_worker.run().unit_error().boxed().compat()); | |
| tokio::spawn(meaning_worker.run().unit_error().boxed().compat()); | |
| stock_worker.my_addr.send(...); | |
| meaning_worker.my_addr.send(...); | |
| Ok(()) | |
| } | |
| fn main() { | |
| tokio::run(async_main().boxed().compat()); | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment