Skip to content

Instantly share code, notes, and snippets.

@vadimcn
Last active November 28, 2017 02:03
Show Gist options
  • Star 2 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save vadimcn/2a21ceba72ef55e43425a755d4687c3e to your computer and use it in GitHub Desktop.
Save vadimcn/2a21ceba72ef55e43425a755d4687c3e to your computer and use it in GitHub Desktop.
Coroutines
macro_rules! await {
($e:expr) => ({
let mut future = $e;
loop {
match future.poll() {
Ok(Async::Ready(r)) => break r,
Ok(Async::NotReady) => yield Async::NotReady,
Err(e) => return Err(e),
}
}
})
}
// Macro for "yielding" a future in a stream generator/transducer
macro_rules! yield_fut {
($e:expr) => ({
let mut future = $e;
loop {
match future.poll() {
Ok(Async::Ready(r)) => yield Async::Ready(r),
Ok(Async::NotReady) => yield Async::NotReady,
Err(e) => return Err(e),
}
}
})
}
impl<T,E,F> Future for F where F: FnMut() -> CoResult<Async<!>, Result<T,E>> {
type Item = T;
type Error = E;
fn poll(&mut self) -> Poll<T, E> {
match self() {
Yield(Async::NotReady) => Ok(Async::NotReady),
Return(Ok(r)) => Ok(Async::Ready(r)),
Return(Err(e)) => Err(e),
// no Yield(Async::Ready(_)) case, because it's uninhabited.
}
}
}
impl<T,E,F> Stream for F where F: FnMut() -> CoResult<Async<T>, Result<(), E>> {
type Item = T;
type Error = E;
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
match self() {
Yield(Async::NotReady) => Ok(Async::NotReady),
Yield(Async::Ready(r)) => Ok(Async::Ready(Some(r))),
Return(Ok(())) => Ok(Async::Ready(None)),
Return(Err(e)) => Err(e),
}
}
}
// Usage syntax: async_for!{(x in s) { ... }}
// With procedural macros we could reduce this to an attribute adorning a for loop: #[async] for x in s {...}
macro_rules! async_for {
(($x:pat in $e:expr) $body:block) => ({
let stream = $e;
loop {
match stream.poll() {
Err(e) => return Err(e),
Ok(Async::NotReady) => yield Async::NotReady,
Ok(Async::Ready(None)) => break,
Ok(Async::Ready(Some(r))) => {
let $x = r;
$body
}
}
}
})
}
///////////////////////////////////////////////////////////////////
// Async stream generator
fn make_stream() -> impl Stream<Item=i32, Error=()> {
move || {
for i in 0..100 {
let j = await!(compute_something(i));
yield_fut!(future::ok(j*2));
}
Ok(())
})
}
// Async stream consumer
fn transform_stream<S:Stream<Item=i32, Error=()>>(mut s: S) -> impl Stream {
move || {
async_for!{(x in s) {
let y = await!(compute_something(x));
yield_fut!(future::ok(y + 1));
// Could also write this as `yield Async::Ready(y+1)`, which IMO isn't bad either.
}}
Ok(())
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment