Skip to content

Instantly share code, notes, and snippets.

@dmexe
Created April 27, 2018 23:55
Show Gist options
  • Save dmexe/30da6cf4bc614e9ef1e222d24d567d51 to your computer and use it in GitHub Desktop.
Save dmexe/30da6cf4bc614e9ef1e222d24d567d51 to your computer and use it in GitHub Desktop.
/// It converts an inner stream into a future, which
/// pick up a first ok result from the stream and stop
/// or if a stream ended without any successful result return
/// a last recorded error from the stream.
///
/// Usage:
/// ```
/// let stream = iter_ok(vec![1,2,3]);
/// let future = stream.next_ok_or_last_err();
/// ```
struct NextOkOrLastErr<S>
where
S: Stream,
{
inner: S,
last_error: Error<S::Error>,
}
#[derive(Debug)]
enum Error<E> {
Inner(E),
Empty,
}
impl<S> NextOkOrLastErr<S>
where
S: Stream,
{
fn new(stream: S) -> Self {
NextOkOrLastErr {
inner: stream,
last_error: Error::Empty,
}
}
}
trait IntoNextOkOrLastErr<S: Stream> {
fn next_ok_or_last_err(self) -> NextOkOrLastErr<S>;
}
impl<S> IntoNextOkOrLastErr<S> for S
where
S: Stream,
{
fn next_ok_or_last_err(self) -> NextOkOrLastErr<S> {
NextOkOrLastErr::new(self)
}
}
impl<S> Future for NextOkOrLastErr<S>
where
S: Stream,
{
type Item = S::Item;
type Error = Error<S::Error>;
fn poll(&mut self) -> Result<Async<<Self as Future>::Item>, <Self as Future>::Error> {
loop {
match self.inner.poll() {
Ok(Async::Ready(Some(value))) => return Ok(Async::Ready(value)),
Ok(Async::Ready(None)) => {
break;
}
Ok(Async::NotReady) => return Ok(Async::NotReady),
Err(err) => self.last_error = Error::Inner(err),
}
}
let reset = Error::Empty;
let last_error = mem::replace(&mut self.last_error, reset);
Err(last_error)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment