| #![feature(async_await, await_macro, futures_api)] | |
| #![allow(unused)] | |
| use futures::future::Future; | |
| use futures::compat::TokioDefaultSpawner; | |
| use futures::future::{FutureExt, TryFutureExt}; | |
| use futures::future::FutureObj; | |
| use futures::prelude::*; | |
| use futures::stream::StreamExt; | |
| use futures::sink::SinkExt; | |
| use futures::stream::Next; | |
| use tokio; | |
| use tokio::await; | |
| use futures::channel::mpsc; | |
| use futures::channel::oneshot; | |
| use failure::Error; | |
| #[derive(Debug)] | |
| pub enum Message { // assume eventually each worker has its own seperate message and response type | |
| RequestMeaningOfLife, | |
| RequestStockPrice, | |
| RequestExecuteStockTrade, | |
| Response(u64), | |
| } | |
| pub trait MessageHandler { | |
| fn new() -> Self; | |
| fn handle<'a>(&'a mut self, msg: Message) -> FutureObj<'a, Result<Message, Error>>; | |
| } | |
| pub struct Worker<Handler: MessageHandler> { | |
| incoming: mpsc::Receiver<(Message, oneshot::Sender<Result<Message, Error>>)>, | |
| pub my_addr: mpsc::Sender<(Message, oneshot::Sender<Result<Message, Error>>)>, | |
| handler: Handler, | |
| } | |
| impl<Handler: MessageHandler> Worker<Handler> { | |
| pub fn new() -> Self { | |
| let (tx, rx) = mpsc::channel(32); | |
| Worker { | |
| incoming: rx, | |
| my_addr: tx, | |
| handler: Handler::new(), | |
| } | |
| } | |
| async fn run(mut self) { | |
| loop { | |
| let t = await!(self.incoming.next()); | |
| let (msg, ret_addr) = t.unwrap(); | |
| let ret_msg = await!(self.handler.handle(msg)); | |
| ret_addr.send(ret_msg).unwrap(); | |
| } | |
| } | |
| } | |
| struct StockMessageHandler { max_seen_price: u64, } | |
| impl MessageHandler for StockMessageHandler { | |
| fn new() -> Self { StockMessageHandler { max_seen_price: 1 } } | |
| fn handle<'a>(&'a mut self, msg: Message) -> FutureObj<'a, Result<Message, Error>> { | |
| FutureObj::new(async move { | |
| match msg { | |
| Message::RequestStockPrice => { | |
| let p = 100; // let p = await!(get_stock_price_from_net()); | |
| if p > self.max_seen_price { | |
| self.max_seen_price = p; | |
| } | |
| Ok(Message::Response(p)) | |
| }, | |
| Message::RequestExecuteStockTrade => { | |
| let profit = 6; // let profit = await!(do_the_trade_from_net()); | |
| Ok(Message::Response(profit)) | |
| }, | |
| _ => panic!(), | |
| } | |
| }.boxed()) | |
| } | |
| } | |
| async fn async_main() -> Result<(), ()> { | |
| let mut stock_worker = Worker::<StockMessageHandler>::new(); | |
| let (tx, rx) = oneshot::channel(); | |
| stock_worker.my_addr.try_send((Message::RequestStockPrice, tx)).unwrap(); | |
| tokio::spawn(stock_worker.run().unit_error().boxed().compat()); | |
| let msg = await!(rx); | |
| match msg { | |
| Ok(Ok(Message::Response(x))) => println!("{}", x), | |
| _ => panic!(), | |
| } | |
| Ok(()) | |
| } | |
| fn main() { | |
| tokio::run(async_main().boxed().compat()); | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment