Skip to content

Instantly share code, notes, and snippets.

@olix0r
Created July 24, 2020 16:48
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save olix0r/95d2b42b3c8ffe8a2f1a87f957405e3b to your computer and use it in GitHub Desktop.
Save olix0r/95d2b42b3c8ffe8a2f1a87f957405e3b to your computer and use it in GitHub Desktop.
use super::Error;
use async_stream::try_stream;
use futures::{prelude::*, stream};
// Flattens a stream-of-streams until an unrecoverable error is encountered.
pub fn flatten<S, T, R>(
mut stream_of_streams: S,
recover: R,
) -> impl TryStream<Ok = T::Ok, Error = Error> + 'static
where
S: TryStream<Ok = T> + Unpin + 'static,
S::Error: Into<Error>,
T: TryStream + Unpin + 'static,
T::Ok: Unpin,
T::Error: Into<Error>,
R: Recover + 'static,
R::Backoff: Unpin,
{
try_stream! {
let mut backoff: Option<R::Backoff> = None;
loop {
if let Some(mut b) = backoff.take() {
if b.next().await.is_some() {
backoff = Some(b);
}
}
match stream_of_streams.try_next().await {
Ok(Some(mut stream)) => {
backoff = None;
loop {
match stream.try_next().await {
Ok(Some(item)) => yield item,
Ok(None) => break,
Err(e) => {
let new_backoff = recover.recover(e.into())?;
backoff = Some(new_backoff);
}
}
}
}
Ok(None) => return,
Err(e) => {
let new_backoff = recover.recover(e.into())?;
backoff = Some(backoff.unwrap_or(new_backoff));
}
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment