Skip to content

Instantly share code, notes, and snippets.

@alex-shapiro
Created July 26, 2017 13:57
Show Gist options
  • Save alex-shapiro/ab398f5a6a59ebf182b50fd50790d375 to your computer and use it in GitHub Desktop.
Save alex-shapiro/ab398f5a6a59ebf182b50fd50790d375 to your computer and use it in GitHub Desktop.
An "AND" version of the futures stream combinator
//! An adapter for merging the output of two streams, where
//! the stream resolves as soon either stream resolves.
use futures::{Poll, Async};
use futures::stream::{Stream, Fuse};
pub struct AndSelect<S1, S2> {
stream1: Fuse<S1>,
stream2: Fuse<S2>,
flag: bool,
}
pub fn new<S1, S2>(stream1: S1, stream2: S2) -> AndSelect<S1, S2>
where S1: Stream,
S2: Stream<Item = S1::Item, Error = S1::Error>
{
AndSelect {
stream1: stream1.fuse(),
stream2: stream2.fuse(),
flag: false,
}
}
impl<S1, S2> Stream for AndSelect<S1, S2>
where S1: Stream,
S2: Stream<Item = S1::Item, Error = S1::Error>
{
type Item = S1::Item;
type Error = S1::Error;
fn poll(&mut self) -> Poll<Option<S1::Item>, S1::Error> {
let (a, b) = if self.flag {
(&mut self.stream2 as &mut Stream<Item=_, Error=_>,
&mut self.stream1 as &mut Stream<Item=_, Error=_>)
} else {
(&mut self.stream1 as &mut Stream<Item=_, Error=_>,
&mut self.stream2 as &mut Stream<Item=_, Error=_>)
};
match a.poll()? {
Async::Ready(Some(item)) => {
self.flag = !self.flag;
return Ok(Some(item).into())
}
Async::Ready(None) => return Ok(None.into()),
Async::NotReady => false,
};
match b.poll()? {
Async::Ready(Some(item)) => Ok(Some(item).into()),
Async::Ready(None) => Ok(None.into()),
Async::NotReady => Ok(Async::NotReady),
}
}
}
@marcelbuesing
Copy link

Thank you! I just ran into the websocket issue. Do you mind if I add this file here under MIT license? I would add a reference to this gist.

@goodclouddata
Copy link

Thanks a lot ! at this point, It stuck me a log time.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment