Skip to content

Instantly share code, notes, and snippets.

@zhangyuchi
Forked from bmwill/futures03_async_await.md
Created September 5, 2019 07:04
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save zhangyuchi/7ee28394c48c3137f893bbb7e48fab1b to your computer and use it in GitHub Desktop.
Save zhangyuchi/7ee28394c48c3137f893bbb7e48fab1b to your computer and use it in GitHub Desktop.
Futures 0.3 and Async/Await

OUTDATED

Futures 0.3 and Async/Await

This document will try to give a short tutorial of how to use the new style futures (0.3), how to use Async/Await, and how to use the compatibility layer to interoperate with old futures (0.1) and tokio.

Terminology

When dealing with futures in this new world there are a lot of different parts and it can be difficult to keep things straight at times. As such here is a quick run down of the different parts and what they are.

**std::future::Future **- The only thing being merged into the standard library as part of the futures_api feature is the Future trait. This trait encapsulates a unit of work that will make progress when the method poll() is invoked, returning Poll::Pending if the computation can't make forward progress yet or Poll::Ready(Self::Output) if the computation is complete. If you want to know more about how the std::future::Futures work you might also want to look at Task and Pin.

Futures 0.3- A crate which provides a number of useful abstractions for asynchronous programming. This crate is built on top of std::future::Future and provides lots of extensions to the base Future trait includes Sinks, Streams, channels, AsyncRead, AsyncWrite as well as a compatibility layer useful for converting between Futures 0.3 and Futures 0.1 (included with the “compat” feature). This crate also includes all of the combinators you know and love from the futures 0.1 world. Also includes a very basic threadpool which futures can be spawned on.

Async/Await - The Async syntax allows you to define asynchronous functions (async fn foo() -> u8) or execution blocks (async { . . . }). An async function or block essentially de-sugars into fn foo() -> impl Future<Output = u8> where the compiler generates the state machine representing the computation inside. While inside an async context you can use the await!() macro to asynchronously wait for the result of the future being awaited.

Futures 0.1 - The crate containing the old style futures trait Future as well as convenience functions for working with them (e.g. all those combinators).

**Tokio **- A crate implementing a work-stealing threadpool (Executor) which futures 0.1 Futures can be spawned on. Tokio is only compatible with the old style futures but 0.3 futures can be spawned on a Tokio runtime (with zero issues) using the compatibility layer included in the Futures 0.3 crate.

What you'll need

First you'll need to have the following in your Cargo.toml:

# Futures 0.3
futures = { version = "0.3.0-alpha.13", package = "futures-preview", features = ["compat"] }
# Futures 0.1
futures_01 = { version = "0.1.25", package = "futures" }
tokio = "0.1.16"

And you'll need to turn on the following features in main.rs or lib.rs:

#![feature(await_macro, async_await, futures_api)]

await_macro - Lets you use the await!() macro async_await - Lets you use async fn and async blocks futures_api - Lets you use the new Futures 0.3 API

Simple Example

This is a simple example where only 0.3 Futures are used with async/await. The only compatibility layer needed is to spawn the async functions onto tokio's runtime.

#![feature(await_macro, async_await, futures_api)]

use futures::channel::mpsc;
use futures::prelude::*;
use tokio::runtime::Runtime;

async fn send_titles(mut sender: mpsc::Sender<&'static str>) {
    await!(sender.send("Way of Kings")).unwrap();
    await!(sender.send("Words of Radiance")).unwrap();
    await!(sender.send("Oathbringer")).unwrap();
}

async fn read_titles(mut receiver: mpsc::Receiver<&'static str>) {
    assert_eq!(await!(receiver.next()), Some("Way of Kings"));
    assert_eq!(await!(receiver.next()), Some("Words of Radiance"));
    assert_eq!(await!(receiver.next()), Some("Oathbringer"));
}

fn main() {
    let mut runtime = Runtime::new().unwrap();
    let (sender, receiver) = mpsc::channel::<&'static str>(10);

    // Spawn the async function on tokio using the Compatibility layer.  The
    // `boxed` function is used to pin the future produced by the async
    // function. This is needed because futures generated via async functions
    // and blocks don't implement `Unpin`.  The `unit_error` function converts a
    // Future with Output=T to a TryFuture with an Output=Result<T, ()>. This is
    // needed because 0.1 Futures require an associated Error type.  The
    // `compat` function takes a TryFuture and puts it in a wrapper so it acts
    // like a 0.1 Future.
    runtime.spawn(send_titles(sender).boxed().unit_error().compat());
    runtime
        .block_on(read_titles(receiver).boxed().unit_error().compat())
        .unwrap();
}

0.1 → 0.3 compatibility layer

Here is the same example as above just using 0.1 Sink/Streams. The send_titles_01() function is how you would write it using 0.1 Futures. The async send_titles() function takes the future generated from the send_titles_01() function and uses the compatibility layer to convert it to a 0.3 Future and awaits it. The read_titles() function takes in a 0.1 Stream, converts it to a 0.3 Stream and then uses the normal await!() macro to await the next value from the stream. As before the async functions needs to use the compatibility layer to be spawned onto tokio's runtime.

#![feature(await_macro, async_await, futures_api)]

use futures::compat::Future01CompatExt;
use futures::compat::Stream01CompatExt;
use futures::prelude::*;
use futures_01::future::Future as Future01;
use futures_01::sink::Sink as Sink01;
use tokio::runtime::Runtime;
use tokio::sync::mpsc;

fn send_titles_01(sender: mpsc::Sender<&'static str>) -> impl Future01<Item = (), Error = ()> {
    sender
        .send("Way of Kings")
        .and_then(|sender| sender.send("Words of Radiance"))
        .and_then(|sender| sender.send("Oathbringer"))
        .map(|_sender| ())
        .map_err(|_| ())
}

async fn send_titles(sender: mpsc::Sender<&'static str>) {
    // Compat the Future01 to a Future03 using the `compat` function
    // from the Future01CompatExt trait.
    await!(send_titles_01(sender).compat()).unwrap();
}

async fn read_titles(receiver: mpsc::Receiver<&'static str>) {
    // Compat the Stream01 to a Stream03 using the `compat` function
    // from the StreamFuture01CompatExt trait.
    let mut receiver = receiver.compat();

    assert_eq!(await!(receiver.next()).unwrap().unwrap(), "Way of Kings");
    assert_eq!(
        await!(receiver.next()).unwrap().unwrap(),
        "Words of Radiance"
    );
    assert_eq!(await!(receiver.next()).unwrap().unwrap(), "Oathbringer");
}

fn main() {
    let mut runtime = Runtime::new().unwrap();
    let (sender, receiver) = mpsc::channel::<&'static str>(10);

    runtime.spawn(send_titles(sender).boxed().unit_error().compat());
    runtime
        .block_on(read_titles(receiver).boxed().unit_error().compat())
        .unwrap();
}

Known Limitations

  • Async functions and blocks (async fn() or async { }) currently only support one lifetime - rust-lang/rust#56238. This is something that is listed as a blocker for landing Async/Await in stable. They have a solution for fixing this issue and just need to get the work done in the compiler. However if you absolutely need this functionality you can use the following workaround (which so happens is the same workaround needed for working around similar limitations for impl Trait):
trait Captures<'a> {}

impl<'a, T> Captures<'a> for T {}

fn foo<'a: 'c, 'b: 'c, 'c>(
    x: &'a mut i32,
    y: &'b mut i32,
) -> impl Future<Output = i32> + Captures<'a> + Captures<'b> + 'c {
    async move {
        *x += *y;
        *y += *x;
        *x + *y
    }
}
  • Trait functions can't be async - https://boats.gitlab.io/blog/post/async-methods-i/. This is not a blocker for stabilizing async/await. There's another feature which needs to be implemented in order to enable allowing async functions (as well as for allowing impl Trait return types) in Traits. Actually the reason why you can't have async functions in traits is because we can't return impl Trait's from Trait functions, this is due to async functions simply de-sugaring into functions which return an impl Future<Output=T>. Though, just like in the impl Trait case, we can workaround the issue by using Trait objects, with the caveat that these async generated Trait objects must be pinned:
#![feature(async_await, futures_api)]

use std::pin::Pin;
use futures::future::Future;
use futures::future::FutureExt;
use futures::executor::block_on;

trait Stormlight {
    fn title(&self) -> Pin<Box<dyn Future<Output = &'static str>>>;
}

struct Oathbringer;

impl Stormlight for Oathbringer {
    fn title(&self) -> Pin<Box<dyn Future<Output = &'static str>>> {
        async {
            "Oathbringer"
        }.boxed()
    }
}

fn main() {
    println!("Title: {}", block_on(Oathbringer.title()));
}
  • async fn() can't be recursive - rust-lang/rust#53690. Which actually makes sense since async functions don't actually have a “real” stack, all the memory needed has to be statically-sized at compile time. This could be done if tail recursion was a thing in Rust, but its not at the moment so instead we'll have to rely on intermediate heap allocations and type erasure to work around the problem. For example we can do the following to implement a recursive async fn. Note: You have to explicitly erase the type of the recursive call for this to work.
use std::future::Future;
use std::pin::Pin;

async fn recursive(i: u32) -> u32 {
    let j = if i > 0 {
        let f: Pin<Box<dyn Future<Output = _>>> = Box::pin(recursive(i - 1));
        await!(f)
    } else {
        0
    };

    i + j
}
  • Dropped variables can still be included across await points: rust-lang/rust#57478. The workaround is to scope { . . .} the offending variables.

miscellaneous Tips when using Futures 0.3

  • If you miss the ability to “wait” for a future using the .wait() method on a 0.1 future, you can use the block_on() function located in the Futures 0.3 crate at futures::executor::block_on.
  • There are new channels (both oneshots and mpsc) which natively work with async/await and implement the new versions of Sink/Stream located in the Futures 0.3 crate at futures::channel.
  • This time around all of the combinators and extension functions for Futures, Sinks, and Streams are apart of their own Ext family of traits (FutureExt, SinkExt, StreamExt).
  • You can only compat a 0.3 Future to a 0.1 Future when its a TryFuture which is a 0.3 Future which has an Output = Result<T, E>.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment