Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?

Looking into the Future

futures-rs is the library which will hopefully become a shared foundation for everything async in Rust. However it's already become renowned for having a steep learning curve, even for experienced Rustaceans.

I think one of the best ways to get comfortable with using a library is to look at how it works internally: often API design can seem bizarre or impenetrable and it's only when you put yourself in the shoes of the library author that you can really understand why it was designed that way.

In this post I'll try to put down on "paper" my understanding of how futures work and I'll aim to do it in a visual way. I'm going to assume you're already somewhat familiar with Rust and why futures are a useful tool to have at one's disposal.

For most of this post I'll be talking about how things work today (as of September 2017). At the end I'll touch on what's being proposed next and also make a case for some of the changes I'd like to see.

If you're interested in learning more about how to use futures or tokio there's no better starting point than the official documentation.

A simple example

To start with, let's look at a code example:

extern crate futures;

fn main() {
    use futures::future::*;

    // Part 1
    let simple_future = lazy(|| {
        println!("Running the closure.");
        ok::<u32, ()>(42)
    });

    // Part 2
    println!("Waiting on the future.");
    let result = simple_future.wait();

    println!("{:?}", result);
}

You should be able to run this and get the following output:

Waiting on the future.
Running the closure.
Ok(42)

On the face of it, this seems quite straightforward but there's a lot going on to make this all work!

Constructing the future (Part 1)

The first part of the program is concerned with constructing the Lazy future, which looks something like this:

Lazy future in the "Unpolled" state

This type of future starts in what I'm going to call the "Unpolled" state. In this state it stores a closure (or really any type implementing FnOnce). The size of this closure is determined by the set of variables it captures: in this case our closure is actually zero-sized, since we don't capture anything.

Many of the future combinator methods, including lazy(...), take closures and/or other futures and combine them together to produce a new future. These inner futures and closures are typically stored inline as fields of the outer future, which is why I've shown the FnOnce closure contained within the Lazy future.

Driving the future (Part 2)

The second part of the program actually drives the future to completion, using the .wait() call. Note that this method is easy to misuse because it executes the future synchronously and so this is not generally used in real programs, but it is useful for demonstration purposes.

Futures implement a single method called poll which contains all of the logic specific to that future. The poll method does whatever work it can without blocking and then returns whether the future has completed or not.

The wait method "drives" the future by polling it until it completes. In the case of the Lazy future, a call to wait looks like this:

Call stack

When the Lazy future is first polled, it calls the closure that it contains and then transitions to the "Polled" state, which stores the result of the closure:

Lazy future in the "Polled" state

In this case, the closure is called, it prints out "Running the closure", and then it returns another future, this time a FutureResult.

Once the Lazy future has transitioned to the Polled state, its poll method recursively polls the inner future, as shown above.

Another example

The AndThen future is commonly used. It's helpful to compare it against the last example to see which aspects are specific to Lazy, and which aspects generalise across other kinds of future.

AndThen stores its nested future and closure inline, just like Lazy, but from now on I'll use arrows to represent the hierarchy of futures since it makes the diagrams clearer.

AndThen state transition

In the first state, AndThen stores a future to run immediately, and a closure to run later. Each time the AndThen future is polled, it recursively polls FutureA. As soon as FutureA completes successfully, the closure is called. The closure takes as input the result from FutureA, and returns a value that can be converted into a second future, FutureB. At this point, AndThen transitions to the second state.

In the second state, it no longer stores FutureA or the closure, since they have already been used. Now it just stores the result of the closure, FutureB. Each time AndThen is polled, it recursively polls FutureB.

What is a task?

This recurring pattern of futures contained within futures can be thought of as a tree structure. In futures terminology, the entire tree of futures constitutes a "task". The task is driven to completion by polling the future at the root of the tree.

Futures tree

This reveals another facet common to many futures: as each leaf future is polled to completion, they tend to be pruned from the tree, such that tree only contains those futures which are still running. Look back at the AndThen example to see this in action. Eventually the entire tree is pruned back to just the root, and the task as a whole completes.

The tree is also a kind of state machine: calling poll transitions from one state to the next until an accepting state is reached (ie. the task is complete). For the Lazy example the state machine would look something like this:

Finite state machine

The combinators provided by the futures library allow constructing arbitrarily large trees of futures, which can result in much more complex state diagrams.

The reactor

The reactor manages a set of tasks: it ensures that the future at the root of each task is repeatedly polled until the task is complete. The .wait() method on futures can be thought of as a very simple kind of reactor which manages a single task.

This is the closest that the futures crate gets to implementing a reactor, which is a common source of confusion for people trying to learn how to use the library. Reactors are needed for any real-world use of futures, but they are only implemented by external crates such as tokio and futures-cpupool.

There are a few new concepts that are needed to understand reactors, so we'll discuss them next.

NotReady

In the examples above, the task could be driven to completion immediately - it didn't need to wait for a long running computation to complete or the result of a HTTP request. However, sometimes a future cannot make progress until some event has occurred. To achieve this, futures can return a special value, NotReady.

However, this gives the reactor a problem: when a task returns NotReady, the reactor could keep trying to poll it continuously until the task completes, but this would be hugely wasteful. Ideally, it would "suspend" the task for a period of time, and then only poll it again when there's a chance it can make progress. This suspension and resumption of tasks (which are commonly referred to as "parking" and "unparking" a task) is one of the largest sources of complexity in the futures library.

The Task::notify() method

To solve this problem, futures must somehow notify the reactor on which they are running when it is time for them to be re-polled. To do this, any future wanting to return NotReady must first obtain a handle to the task (via task::current()) and stash it away. When the future is ready to make progress, something must take that stashed task handle, and call task.notify().

Let's look at an example:

Waking a task after timer elapses

In this case, I've invented a new future, SleepUntil, which completes once the current time reaches a certain value. It achieves this by calculating the time remaining to wait, and asking a Timer to notify the current task after that time has elapsed. Note that futures may be woken spuriously, so it's important that SleepUntil checks that enough time has elapsed, and schedules a new notify event if not.

AndThen runs one future, and then if that completes successfully, it runs a second future. On the first poll, AndThen polls FutureResult, which of course resolves immediately. It then polls SleepUntil, which schedules the current task to be woken up in five seconds, and then returns NotReady, which AndThen dutifully passes back to the reactor.

At this point the reactor puts the task to one side, and will no longer try to poll it. When there are no more tasks to be polled, the reactor goes to sleep to avoid using unnecessary CPU cycles.

Now when the time has elapsed, the task is notified, which wakes the reactor and tells it to re-poll the task. This time, AndThen goes straight to polling SleepUntil since the other future had already completed. It sees that enough time has elapsed, and so completes successfully.

FuturesUnordered

As we saw in the previous example, each time a task is notified and re-polled, it's polled from the root, and then recurses down into the futures beneath. This is fine for combinators which execute their children sequentially, such as AndThen, because while the tree of "active" futures may get quite deep, it will never get very broad - at any time there is only one leaf node in the tree that must be polled.

However, some combinators, like FuturesUnordered, wait for any of a large number of child futures to complete.

FuturesUnordered takes a set of futures, and returns a stream of results from those futures in whatever order the futures complete. This is useful any time you want to run a set of futures in parallel, because it doesn't enforce a sequential ordering the way AndThen does, but it means that the tree of futures becomes arbitrarily broad.

A naive implementation would poll every single child each time the FuturesUnordered stream was polled to see if any of them have become ready, but this would be very slow. To solve this problem, the futures library changed how tasks are notified.

This is where the notion of a "task" gets slightly murky, and things get a little complicated: instead of task.notify() waking the task as a whole, it now wakes the specific leaf of the tree which returned NotReady:

Waking leaf node

The default Task just notifies the reactor, so if a future wants to be notified when one of its children becomes ready, it replaces the current Task with a temporary one which both notifies the original Task and performs some additional work specific to the future.

As an example, the FuturesUnordered combinator gives each of its children an identifier, and then before polling a child it sets a custom task handle, which adds that child's identifier to a concurrent set of identifiers if it is ever notified. Now, whenever the FuturesUnordered future is polled, it need only poll the children which have since been added to that set.

This pseudo-code shows how roughly this works, although the actual implementation differs for safety and performance reasons:

let children_to_poll = ConcurrentSet::new();

fn poll_child_future(id, future) {
    // Save the old task
    let old_task = get_current_task();
    
    // Set a new task, which adds the ID to the concurrent set when it is notified
    set_current_task(CustomTask(|| {
        children_to_poll.insert(id);
        old_task.notify();
    }));

    // Poll the child future while the custom task is active
    child_future.poll();

    // Restore the old task
    set_current_task(old_task);
}

When is a task not a task

This all works, but it adds a great deal of confusion over what exactly task::current() does, and how it is implemented. It no longer just returns a handle to the current task. Instead, it's more like a combination of a task and some form of path to a specific future running as part of that task, although it's unclear if this analogy will remain useful in future.

task::current() is implemented via a scoped-thread-local variable. It is initialized by a reactor before polling a task and as discussed above, may be overridden by specific futures within the task. Outside of #![no_std] contexts, it is implemented as an Arc<Notify>, where Notify is a trait object which encapsulates notifying the relevant reactor, along with some methods for cloning and dropping task handles. In #![no_std] contexts, it performs the same function, but offloads some of the complexity onto the user, who must implement any memory management required.

Tokio

That's all fine, but at some point we actually need to be able to perform some IO asynchronously. This is where tokio comes in: it takes the mio library which provides asynchonous network IO, and adapts the API to return futures.

A mio program includes an event loop, which listens for network events from the operating system and then dispatches them appropriately. Tokio uses the mio event loop to implement a reactor. Each time a network event comes in, it is mapped to an existing task handle, which is then notified. At the same time, the event loop is extended to listen for notify events as well. Now the logic looks something like this:

loop {
    match receive_event() {
        NetworkEvent(mio_id) => map_mio_id_to_task(id).notify(),
        NotifyEvent(task_id) => poll_task(task_id),
        ... // There are also additional events, such as one for spawning a new task onto the reactor, which I haven't included.
    }
}

Note that tasks cannot be polled directly in response to network events, because task.notify() doesn't only wake the task: it also triggers any additional behaviours added to the task handle (by the likes of FuturesUnordered). For this reason, all network events must take two trips through the event loop before being processed.

Thread safety

The implementation of the tokio event loop is made simpler by the fact that almost none of it needs to be thread-safe. The only operation required to be thread-safe is that of posting new events onto the queue.

This is also why the distinction between Handle and Remote exists in tokio. Performing IO requires setting up the appropriate mappings from mio IDs to task handles, among other things. This cannot be done concurrently because the relevant data structures are not thread-safe. The solution is to use a Remote to spawn a function onto the correct thread, which can then perform any IO required.

!Send Handle

Since the tokio Handle must stay on the mio event loop, it doesn't implement Send. This is problematic, because a typical way to obtain a Handle is by capturing it as part of a closure which will be turned into a future. This has the knock-on effect of making the future itself !Send, which can make them harder to use.

Proposed changes

A major new RFC proposes various changes to how futures and tokio work in order to make them easier to use, which I'll summarise here:

Changes to futures

These changes are relatively minor, and mostly involve improvements to the API surface. For example, the .wait() method will be removed, since it was easy to misuse and accidentally cause synchronous behaviour. Instead a basic reactor design called TaskRunner will be provided to run tasks on a single thread, along with some helper functions for common tasks.

Changes to tokio

Tokio is changing more dramatically: instead of combining the mio event loop and futures reactor into one, they will be split out.

  • A default global mio event loop will be created when a tokio Handle is constructed using the Default trait.
  • It will not be possible to spawn futures to the default event loop, but it will still be possible to construct a custom reactor by combining a TaskRunner with the tokio API using relatively little code.
  • The tokio internals will be made mostly thread-safe, such that IO can be performed from any thread.
  • There will no longer be a need for a Remote handle.

While these changes should make it easier to use tokio, they're likely to complicate the implementation of tokio itself and may have a negative effect on performance due to the additional synchronisation requirements.

My take on futures and tokio

I think the reactive model futures is based on is very powerful, and the proposed changes should remove some of the frustrations with the API. However I still see some problems:

Implementation complexity

If you want to use a library in production, it's not enough for it to be easy to use. It also has to have an implementation that is understandable, since at some point you may end up with a production issue that needs to be traced back through that library. There is a huge amount of complexity in futures and tokio, and I believe some of it can still be eliminated.

Finally, the heavy use of thread locals and shared state are likely to make it difficult to follow execution through the program, and reduce the effectiveness of the static analysis performed by the compiler as part of type checking.

Overly specialised for tokio

Futures are portrayed as a fully general building block for asynchronous programming, capable of being used on any kind of reactor or event loop. However, the design has been very focused on supporting the tokio use-case, and there's a danger that it doesn't easily generalise to other kinds of reactor.

Making tokio accessible from multiple threads may well be the best direction for the library to take, but it takes some of the pressure off making futures easier to use in general. For the case of a reactor based on a UI event loop, making it thread-safe is usually not possible. It would therefore be helpful for some futures to be able to obtain additional information from the reactor itself, or to restrict which reactors they can be polled on.

What I'd like to see

These are a set of changes which I think together form a particularly coherent vision but which could be applied individually.

Removing thread-local usage

Instead of having a single Task type obtained from task::current(), the current task would passed as a parameter to the poll() method of futures. This clearly distinguishes functions which must be run on a task, from those which can be run anywhere.

You can read the discussion on this feature request. The primary objections seem to be the need to add specialised versions of the io::{Read, Write} traits which take an additional task parameter. On the other hand, there is a huge footgun with using the existing traits, which is that only some implementors will add the correct notify() calls to make them compatible with futures: those that don't will compile fine, but may cause your code to hang unpredictably at runtime when a call happens to return NotReady but never wakes up the task.

What is currently called a Task would be renamed to TaskHandle, and would instead be obtained via a .handle() method on the task parameter.

Generic task type

As a follow on to the previous point, it's possible to make the task type generic. This works by splitting Future into two traits:

trait Future {
    type Item;
    type Error;
}
trait Pollable<TaskT>: Future {
    fn poll(&mut self, task: &mut TaskT) -> Poll<Self::Item, Self::Error>;
}

This allows you to place statically checked requirements on where your future can be polled, whilst also requesting additional information from the reactor.

As an example, the tokio reactor could provide the Handle via a method on the TokioTask type used in place of TaskT. Now you can implement futures which are guaranteed to be scheduled to a valid tokio reactor.

Similarly, on a UI thread, you could provide methods on the task to access the UI state.

Notify tasks not futures

The complexity around notifying tasks comes from the performance problems with polling a lot of futures as part of a single task. Tracking where in the futures tree a notify event occurred is one solution, but I think there's a better solution: spawn new tasks.

When a FuturesUnordered is polled, it can spawn its child futures as new tasks. Now task notifications triggered by a child future only cause the child to be re-polled, right up until the entire child has completed. The notify mechanism can be greatly simplified, the "task handle" can actually be a task handle, rather than a strange combination of task and metadata, and you reduce the amount of polling even further, since the FuturesUnordered and parent futures need not be polled whenever a child becomes ready with this approach.

For tokio, network events need not travel through the event loop twice: in response to a network event, tokio can just skip the notify() machinery, and directly re-poll the affected task.

There are a number of caveats to this approach, which I will address:

  • How does the future spawn new tasks onto the reactor? If the "generic task type" feature is implemented, then specific task types can implement a trait which provides this functionality. The FuturesUnordered combinator can then specifically implement Pollable<TaskT: Spawn>. With a non-generic task type, or if continuing to use thread-local storage, this is still possible, but relies on runtime checks to ensure the reactor can spawn new tasks.

  • Spawning a future typically involves a memory allocation, won't this hurt performance? FuturesUnordered already requires its futures to be of the same type as each other. The next section below suggests a scheme which sidesteps the need to allocate when spawning futures of the same type as ones that have already been spawned.

  • What if the task containing the FuturesUnordered future is cancelled, won't the other tasks keep running? spawn-ing a future returns a future representing the completion of that spawned task. Dropping this future can automatically cancel the spawned task. Therefore, if the FuturesUnordered task is dropped, it will automatically cause the tasks containing the child futures to be cancelled too.

Use an AnyMap to store tasks in a reactor

It's common to spawn lots of the same type of task to a reactor; by this I mean that the concrete type of the root future is the same across multiple different tasks. A reactor can take advantage of this by grouping tasks by the type of their root futures. Each group can be stored in a Vec<FutureT> (or whatever data structure is most appropriate) and an AnyMap can be used to store all the different groups.

Instead of each spawned future requiring its own allocation (due to the need to store tasks of different types in a single data structure) allocation is only necessary when the Vec needs to be reallocated. Now, tasks spawned from the same thread as the reactor will not typically cause any allocation.

Tasks spawned from another thread will still need an allocation in order to perform the transfer, but this is often the case for concurrent data structures anyway (Rust's own channel type for example, performs an allocation for each item).

Another specialised reactor

Having another specialised reactor in addition to tokio would help ensure futures become as general-purpose as possible. A good example might be a UI event loop reactor - perhaps as part of gtk-rs or winit.

Instead of these event loops being internally multi-threaded, they would use an "invoke"-style API. This way the reactor itself can remain single-threaded, but new tasks can be spawned onto the reactor by posting events from other threads. See Control.invoke() from winforms for an example of this approach.

References to the reactor and the current UI state would be obtainable from the current task, assuming the "generic task type" feature is implemented, and it would be possible to statically guarantee that such futures are only spawned to the UI thread.

Thanks for reading!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.