Coroutines
| macro_rules! await { | |
| ($e:expr) => ({ | |
| let mut future = $e; | |
| loop { | |
| match future.poll() { | |
| Ok(Async::Ready(r)) => break r, | |
| Ok(Async::NotReady) => yield Async::NotReady, | |
| Err(e) => return Err(e), | |
| } | |
| } | |
| }) | |
| } | |
| // Macro for "yielding" a future in a stream generator/transducer | |
| macro_rules! yield_fut { | |
| ($e:expr) => ({ | |
| let mut future = $e; | |
| loop { | |
| match future.poll() { | |
| Ok(Async::Ready(r)) => yield Async::Ready(r), | |
| Ok(Async::NotReady) => yield Async::NotReady, | |
| Err(e) => return Err(e), | |
| } | |
| } | |
| }) | |
| } | |
| impl<T,E,F> Future for F where F: FnMut() -> CoResult<Async<!>, Result<T,E>> { | |
| type Item = T; | |
| type Error = E; | |
| fn poll(&mut self) -> Poll<T, E> { | |
| match self() { | |
| Yield(Async::NotReady) => Ok(Async::NotReady), | |
| Return(Ok(r)) => Ok(Async::Ready(r)), | |
| Return(Err(e)) => Err(e), | |
| // no Yield(Async::Ready(_)) case, because it's uninhabited. | |
| } | |
| } | |
| } | |
| impl<T,E,F> Stream for F where F: FnMut() -> CoResult<Async<T>, Result<(), E>> { | |
| type Item = T; | |
| type Error = E; | |
| fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> { | |
| match self() { | |
| Yield(Async::NotReady) => Ok(Async::NotReady), | |
| Yield(Async::Ready(r)) => Ok(Async::Ready(Some(r))), | |
| Return(Ok(())) => Ok(Async::Ready(None)), | |
| Return(Err(e)) => Err(e), | |
| } | |
| } | |
| } | |
| // Usage syntax: async_for!{(x in s) { ... }} | |
| // With procedural macros we could reduce this to an attribute adorning a for loop: #[async] for x in s {...} | |
| macro_rules! async_for { | |
| (($x:pat in $e:expr) $body:block) => ({ | |
| let stream = $e; | |
| loop { | |
| match stream.poll() { | |
| Err(e) => return Err(e), | |
| Ok(Async::NotReady) => yield Async::NotReady, | |
| Ok(Async::Ready(None)) => break, | |
| Ok(Async::Ready(Some(r))) => { | |
| let $x = r; | |
| $body | |
| } | |
| } | |
| } | |
| }) | |
| } | |
| /////////////////////////////////////////////////////////////////// | |
| // Async stream generator | |
| fn make_stream() -> impl Stream<Item=i32, Error=()> { | |
| move || { | |
| for i in 0..100 { | |
| let j = await!(compute_something(i)); | |
| yield_fut!(future::ok(j*2)); | |
| } | |
| Ok(()) | |
| }) | |
| } | |
| // Async stream consumer | |
| fn transform_stream<S:Stream<Item=i32, Error=()>>(mut s: S) -> impl Stream { | |
| move || { | |
| async_for!{(x in s) { | |
| let y = await!(compute_something(x)); | |
| yield_fut!(future::ok(y + 1)); | |
| // Could also write this as `yield Async::Ready(y+1)`, which IMO isn't bad either. | |
| }} | |
| Ok(()) | |
| } | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment