Skip to content

Instantly share code, notes, and snippets.

@rubdos
Last active March 6, 2018 17:20
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save rubdos/e8ddb95def55bc975ccfe748cc3a3911 to your computer and use it in GitHub Desktop.
Save rubdos/e8ddb95def55bc975ccfe748cc3a3911 to your computer and use it in GitHub Desktop.
Joining two streams in futures-async
// TODO: It would be nice to find a way in which we don't have to create an intermediate event type
macro_rules! join_streams {
($($stream:ident($stream_item:pat) => $block:block,)*) => {
join_streams!{$($stream($stream_item) => $block),*}
};
($($stream:ident($stream_item:pat) => $block:block),*) => {
#[allow(non_camel_case_types)]
struct JoinedStream<$($stream),*> {
$($stream: $stream),*
}
#[allow(non_camel_case_types)]
enum JoinStreamEvent<$($stream),*> {
$($stream($stream)),*
}
#[allow(non_camel_case_types)]
impl<$($stream : Stream),*> Stream for JoinedStream<$($stream),*>
where $(io::Error: From<<$stream as Stream>::Error>),*
{
type Item = JoinStreamEvent<$($stream::Item),*>;
type Error = io::Error;
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
$({
match self.$stream.poll() {
Ok(Async::Ready(Some(item))) => {
return Ok(Async::Ready(Some(JoinStreamEvent::$stream(item))))
},
Ok(Async::Ready(None)) => return Ok(Async::Ready(None)),
Ok(Async::NotReady) => (),
Err(e) => Err(e)?,
}
})*
Ok(Async::NotReady)
}
}
let mut s = JoinedStream {
$($stream),*
};
loop {
let (event, rest) = await!(s.into_future()).map_err(|e| e.0)?;
s = rest;
match event {
None => break,
$(
Some(JoinStreamEvent::$stream($stream_item)) => $block
),*
}
}
};
}
let a = unimplemented!(); // A stream
let b = unimplemented!(); // A second stream
join_streams!{
a(item) => {
process_item(item);
},
b((item, other_thing)) => {...},
};
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment