Skip to content

Instantly share code, notes, and snippets.

@stepchowfun
Created March 26, 2019 16:54
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save stepchowfun/acbdd53ec4c6d8b0a44d50d8d4f55af9 to your computer and use it in GitHub Desktop.
Save stepchowfun/acbdd53ec4c6d8b0a44d50d8d4f55af9 to your computer and use it in GitHub Desktop.
fn when<
I: 'static + Send,
F: 'static + Send + Future<Item = I, Error = ()>,
R: 'static + Send,
K: 'static + Send + Fn(&[I]) -> Option<R>,
>(
futures: Vec<F>,
k: K,
) -> Box<dyn Future<Item = R, Error = ()> + Send> {
fn when_rec<
I: 'static + Send,
R: 'static + Send,
K: 'static + Send + Fn(&[I]) -> Option<R>,
>(
stream: Box<dyn Stream<Item = I, Error = ()> + Send>,
mut acc: Vec<I>,
k: K,
) -> Box<dyn Future<Item = R, Error = ()> + Send> {
if let Some(result) = k(&acc) {
Box::new(ok(result))
} else {
Box::new(stream.into_future().then(|result| match result {
Ok((x, s)) => {
if let Some(r) = x {
acc.push(r);
when_rec(s, acc, k)
} else {
Box::new(err(()))
}
}
Err((_, s)) => when_rec(s, acc, k),
}))
}
}
when_rec(
futures
.into_iter()
.map(|future| future.into_stream())
.fold(Box::new(stream::empty()), |acc, x| Box::new(acc.select(x))),
Vec::new(),
k,
)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment