Joining two streams in futures-async
| // TODO: It would be nice to find a way in which we don't have to create an intermediate event type | |
| macro_rules! join_streams { | |
| ($($stream:ident($stream_item:pat) => $block:block,)*) => { | |
| join_streams!{$($stream($stream_item) => $block),*} | |
| }; | |
| ($($stream:ident($stream_item:pat) => $block:block),*) => { | |
| #[allow(non_camel_case_types)] | |
| struct JoinedStream<$($stream),*> { | |
| $($stream: $stream),* | |
| } | |
| #[allow(non_camel_case_types)] | |
| enum JoinStreamEvent<$($stream),*> { | |
| $($stream($stream)),* | |
| } | |
| #[allow(non_camel_case_types)] | |
| impl<$($stream : Stream),*> Stream for JoinedStream<$($stream),*> | |
| where $(io::Error: From<<$stream as Stream>::Error>),* | |
| { | |
| type Item = JoinStreamEvent<$($stream::Item),*>; | |
| type Error = io::Error; | |
| fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> { | |
| $({ | |
| match self.$stream.poll() { | |
| Ok(Async::Ready(Some(item))) => { | |
| return Ok(Async::Ready(Some(JoinStreamEvent::$stream(item)))) | |
| }, | |
| Ok(Async::Ready(None)) => return Ok(Async::Ready(None)), | |
| Ok(Async::NotReady) => (), | |
| Err(e) => Err(e)?, | |
| } | |
| })* | |
| Ok(Async::NotReady) | |
| } | |
| } | |
| let mut s = JoinedStream { | |
| $($stream),* | |
| }; | |
| loop { | |
| let (event, rest) = await!(s.into_future()).map_err(|e| e.0)?; | |
| s = rest; | |
| match event { | |
| None => break, | |
| $( | |
| Some(JoinStreamEvent::$stream($stream_item)) => $block | |
| ),* | |
| } | |
| } | |
| }; | |
| } | |
| let a = unimplemented!(); // A stream | |
| let b = unimplemented!(); // A second stream | |
| join_streams!{ | |
| a(item) => { | |
| process_item(item); | |
| }, | |
| b((item, other_thing)) => {...}, | |
| }; |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment