Skip to content

Instantly share code, notes, and snippets.

@rust-play
Created April 27, 2018 21:58
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save rust-play/e524254713e76dbe7e756464a7b8932a to your computer and use it in GitHub Desktop.
Save rust-play/e524254713e76dbe7e756464a7b8932a to your computer and use it in GitHub Desktop.
Code shared from the Rust Playground
pub struct MultiAddrTcpStream {
addrs: Vec<SocketAddr>,
future: Option<IoFuture<TcpStream>>,
timeout: Option<time::Duration>,
last_err: Option<io::Error>,
}
impl MultiAddrTcpStream {
pub fn connect(mut addrs: Vec<SocketAddr>) -> MultiAddrTcpStream {
Self::new(addrs, None)
}
pub fn connect_timeout(mut addrs: Vec<SocketAddr>, timeout: time::Duration) -> MultiAddrTcpStream {
Self::new(addrs, Some(timeout))
}
fn new(mut addrs: Vec<SocketAddr>, timeout: Option<time::Duration>) -> MultiAddrTcpStream {
addrs.reverse();
MultiAddrTcpStream {
addrs,
future: None,
timeout,
last_err: None,
}
}
}
impl Future for MultiAddrTcpStream {
type Item = TcpStream;
type Error = io::Error;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
loop {
if self.future.is_none() {
if let Some(addr) = self.addrs.pop() {
let connect_process = TcpStream::connect(&addr);
if let Some(timeout) = self.timeout.as_ref() {
let deadline_process = timer::Deadline::new(connect_process, Instant::now() + *timeout).map_err(|err| {
match err.into_inner() {
Some(err) => err,
None => io::ErrorKind::TimedOut.into()
}
});
self.future = Some(Box::new(deadline_process));
} else {
self.future = Some(Box::new(connect_process));
}
} else {
if let Some(err) = self.last_err.take() {
return Err(err);
} else {
return Err(io::ErrorKind::InvalidInput.into());
}
}
}
let mut f = self.future.take().unwrap();
match f.poll() {
Ok(Async::Ready(socket)) => return Ok(Async::Ready(socket)),
Ok(Async::NotReady) => {
self.future = Some(f);
return Ok(Async::NotReady);
}
Err(err) => {
self.last_err = Some(err);
continue;
}
}
}
}
}
@dmexe
Copy link

dmexe commented Apr 27, 2018

More generic version:

/// It converts an inner stream into a future, which
/// pick up a first ok result from the stream and stop
/// or if a stream ended without any successful result returns
/// a last recorded error from the stream.
///
/// Usage:
/// ```
/// let stream = iter_ok(vec![1,2,3]);
/// let future = stream.next_ok_or_last_err();
/// ```
struct NextOkOrLastErr<S>
where
    S: Stream,
{
    inner: S,
    last_error: Error<S::Error>,
}

#[derive(Debug)]
enum Error<E> {
    Inner(E),
    Empty,
}

impl<S> NextOkOrLastErr<S>
where
    S: Stream,
{
    fn new(stream: S) -> Self {
        NextOkOrLastErr {
            inner: stream,
            last_error: Error::Empty,
        }
    }
}

trait IntoNextOkOrLastErr<S: Stream> {
    fn next_ok_or_last_err(self) -> NextOkOrLastErr<S>;
}

impl<S> IntoNextOkOrLastErr<S> for S
where
    S: Stream,
{
    fn next_ok_or_last_err(self) -> NextOkOrLastErr<S> {
        NextOkOrLastErr::new(self)
    }
}


impl<S> Future for NextOkOrLastErr<S>
where
    S: Stream,
{
    type Item = S::Item;
    type Error = Error<S::Error>;

    fn poll(&mut self) -> Result<Async<<Self as Future>::Item>, <Self as Future>::Error> {
        loop {
            match self.inner.poll() {
                Ok(Async::Ready(Some(value))) => return Ok(Async::Ready(value)),
                Ok(Async::Ready(None)) => {
                    break;
                }
                Ok(Async::NotReady) => return Ok(Async::NotReady),
                Err(err) => self.last_error = Error::Inner(err),
            }
        }

        let reset = Error::Empty;
        let last_error = mem::replace(&mut self.last_error, reset);

        Err(last_error)
    }
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment