-
-
Save vadimcn/2a21ceba72ef55e43425a755d4687c3e to your computer and use it in GitHub Desktop.
Coroutines
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
macro_rules! await { | |
($e:expr) => ({ | |
let mut future = $e; | |
loop { | |
match future.poll() { | |
Ok(Async::Ready(r)) => break r, | |
Ok(Async::NotReady) => yield Async::NotReady, | |
Err(e) => return Err(e), | |
} | |
} | |
}) | |
} | |
// Macro for "yielding" a future in a stream generator/transducer | |
macro_rules! yield_fut { | |
($e:expr) => ({ | |
let mut future = $e; | |
loop { | |
match future.poll() { | |
Ok(Async::Ready(r)) => yield Async::Ready(r), | |
Ok(Async::NotReady) => yield Async::NotReady, | |
Err(e) => return Err(e), | |
} | |
} | |
}) | |
} | |
impl<T,E,F> Future for F where F: FnMut() -> CoResult<Async<!>, Result<T,E>> { | |
type Item = T; | |
type Error = E; | |
fn poll(&mut self) -> Poll<T, E> { | |
match self() { | |
Yield(Async::NotReady) => Ok(Async::NotReady), | |
Return(Ok(r)) => Ok(Async::Ready(r)), | |
Return(Err(e)) => Err(e), | |
// no Yield(Async::Ready(_)) case, because it's uninhabited. | |
} | |
} | |
} | |
impl<T,E,F> Stream for F where F: FnMut() -> CoResult<Async<T>, Result<(), E>> { | |
type Item = T; | |
type Error = E; | |
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> { | |
match self() { | |
Yield(Async::NotReady) => Ok(Async::NotReady), | |
Yield(Async::Ready(r)) => Ok(Async::Ready(Some(r))), | |
Return(Ok(())) => Ok(Async::Ready(None)), | |
Return(Err(e)) => Err(e), | |
} | |
} | |
} | |
// Usage syntax: async_for!{(x in s) { ... }} | |
// With procedural macros we could reduce this to an attribute adorning a for loop: #[async] for x in s {...} | |
macro_rules! async_for { | |
(($x:pat in $e:expr) $body:block) => ({ | |
let stream = $e; | |
loop { | |
match stream.poll() { | |
Err(e) => return Err(e), | |
Ok(Async::NotReady) => yield Async::NotReady, | |
Ok(Async::Ready(None)) => break, | |
Ok(Async::Ready(Some(r))) => { | |
let $x = r; | |
$body | |
} | |
} | |
} | |
}) | |
} | |
/////////////////////////////////////////////////////////////////// | |
// Async stream generator | |
fn make_stream() -> impl Stream<Item=i32, Error=()> { | |
move || { | |
for i in 0..100 { | |
let j = await!(compute_something(i)); | |
yield_fut!(future::ok(j*2)); | |
} | |
Ok(()) | |
}) | |
} | |
// Async stream consumer | |
fn transform_stream<S:Stream<Item=i32, Error=()>>(mut s: S) -> impl Stream { | |
move || { | |
async_for!{(x in s) { | |
let y = await!(compute_something(x)); | |
yield_fut!(future::ok(y + 1)); | |
// Could also write this as `yield Async::Ready(y+1)`, which IMO isn't bad either. | |
}} | |
Ok(()) | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment