Skip to content

Instantly share code, notes, and snippets.

@tarquin-the-brave
Last active February 19, 2020 13:52
Show Gist options
  • Save tarquin-the-brave/a0bbed9b9a50a11ad5da3a217d887fa2 to your computer and use it in GitHub Desktop.
Save tarquin-the-brave/a0bbed9b9a50a11ad5da3a217d887fa2 to your computer and use it in GitHub Desktop.

Joining Futures that can fail.

Rust futures join_all has changed behaviour.

With old futures API futures:0.1

When you had a bunch of futures that you want to wait to complete, you could join them with futures::future::join_all and wait on the joined future created.

Futures had an implicit result with the Item & Error type parameters on the Future trait, so all futures "could fail".

join_all would return a single Error if one future fails and not wait for all the futures to complete.

This is inconvenient if you don't want everything to fail because one of the futures failed. E.g. if you send off a bunch of requests, not minding if some of them fail, but wanting to get the data from the successful ones.

This could be worked around by packing the result of each future into an Ok variant of the Result enum, so as far as join_all is concerned, every future has succeeded, and unwrap'ing those results after joining.

Here's an example of this using futures:0.1.

With the new futures API futures:0.3

Now futures don't have this implicit result, and the Future trait has a single type parameter: Output. Futures don't need to "be able to fail" now. When futures can fail, they return Result as their Output type.

futures::future::join_all with now wait for all of the futures to complete, and when await'd will produce a collection of the Output type of the futures. This makes the above workaround unnecessary. The collection of futures can also be collected into a Stream. Here's an example of both these approaches using futures:0.3.

The behaviour of bailing out once one future has failed and returning a single error is now performed by futures::future::try_join_all.

try_join_all behaves like old join_all!

//!
//! Get the results of a collection of futures.
//!
//! Using the old futures API from futures 0.1.
//!
use failure::{format_err, Error};
use futures::{
future::{err, join_all, ok},
Future,
};
enum Outcome {
Good,
Bad,
}
fn get_single_future(outcome: Outcome) -> impl Future<Item = String, Error = Error> {
match outcome {
Outcome::Good => ok("Success!".to_string()),
Outcome::Bad => err(format_err!("Failure")),
}
}
fn get_joined_future() -> impl Future<Item = Vec<Result<String, Error>>, Error = ()> {
let outcomes = vec![Outcome::Good, Outcome::Bad, Outcome::Good];
let packed_futures = outcomes
.into_iter()
.map(|x| {
get_single_future(x).then(|res| match res {
// To avoid join_all() returning a single error as soon as
// one of these futures fails, pack the result of each future
// into an ok the joined future will resolves into an Ok, which
// we can unwrap once resolved.
Ok(message) => ok(Ok(message)),
Err(whoopsie) => ok(Err(whoopsie)),
})
})
.collect::<Vec<_>>();
join_all(packed_futures)
}
pub fn get_results() -> Vec<Result<String, Error>> {
let mut rt = tokio::runtime::Runtime::new().unwrap();
rt.block_on(get_joined_future()).unwrap()
}
//!
//! Get the results of a collection of futures.
//!
//! Using async await syntax and futures 0.3.
//!
use failure::{format_err, Error};
use futures::{
executor::block_on,
};
enum Outcome {
Good,
Bad,
}
async fn get_single_future(outcome: Outcome) -> Result<String, Error> {
match outcome {
Outcome::Good => Ok("Success!".to_string()),
Outcome::Bad => Err(format_err!("Failure")),
}
}
async fn get_joined_future() -> Vec<Result<String, Error>> {
use futures::future::join_all;
let outcomes = vec![Outcome::Good, Outcome::Bad, Outcome::Good];
let some_futures = outcomes
.into_iter()
.map(|outcome| async {
get_single_future(outcome).await
})
.collect::<Vec<_>>();
join_all(some_futures).await
}
async fn get_futures_via_stream() -> Vec<Result<String, Error>> {
use futures::stream::{FuturesUnordered, StreamExt as _};
let outcomes = vec![Outcome::Good, Outcome::Bad, Outcome::Good];
outcomes
.into_iter()
.map(|outcome| async {
get_single_future(outcome).await
})
// First we collect the collection of futures into the
// stream type `FuturesUnordered`.
.collect::<FuturesUnordered<_>>()
// Then we collect the stream into a future, and await.
// This collect() method is from the StreamExt trait.
.collect()
.await
}
pub fn get_results_with_join() -> Vec<Result<String, Error>> {
block_on(get_joined_future())
}
pub fn get_results_with_stream() -> Vec<Result<String, Error>> {
block_on(get_futures_via_stream())
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment