Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
Blog article: How does async work in async-std?

How does async work in async-std?

(Phaiax - 2019/12/1 - CC_BY_SA 4.0)

Lately I was porting a software from tokio/futures-1.0 to async-await. I somehow thought async-std was the successor of tokio and ported everything to async-std. 80% in, I noticed that my hyper dependency requires tokio and that it's not possible to replace tokio with async-std without also replacing hyper. Also, tokio and async-std try to solve the same problem. So I started a journey into the inners of the rust async story to find out if it is possible to use both tokio and async-std at the same time. (tl;dr: it is). I had heard of reactors and executors before, but there was much new stuff to discover.

I found it interesting, so I decided to write it down. The following is a dive into a mixture of async-std-1.0.1 and async-std-master. I guess that tokio does not differ that much in its architectural design. I will skip async-await sugar, Pin and futures. Disclaimer: There may be misconceptions in here.

non blocking syscalls

Syscalls are operating system functions. A typical program calls these functions to read from and write to files and network sockets and to open outgoing network connections or accepting incoming network connections. (There are many other syscalls, but these ones are used more often than the others.)

Usually each of these syscalls pauses the calling thread until the operation is done. But it is possible to configure these syscalls to return immediately (with the flag EWOULDBLOCK) instead of blocking the thread. The programmer has to call the syscall again at a later time, to see if new data is available.

To avoid the need of constantly polling these syscalls, the operation system provides another syscall epoll, which can be used to wait for any of a group of files or network connections to become ready. That means that epoll is blocking, but on many files at once, not only on a single one like e.g. read does. epoll also tells the caller, which files are now ready to proceed.

Writing programs in a blocking way is very easy. Just start threads for everything that can block. The code is interrupted if it cannot proceed further and resumed if it can.

In contrast, async is hard. People did go to very great lengths, only to make async feel like blocking syscalls. If feels kind of insane and overly complex compared to the simplicity of blocking syscalls. And it is only to avoid the overhead of creating system threads. (The numbers say that it's worth it.)

First layer of abstraction: Mio

(This is about mio 0.6. Tokio has already pushed for mio 0.7.)

mio is a platform independent abstraction for epoll with extras.

The central type of mio is Poll. The user of a poll: Poll can register interest in the readiness of a few file descriptors and then call poll.poll() which internally calls the epoll syscall. When poll.poll() returns, it tells which file descriptors are now ready to proceed. poll.poll() also takes a timeout, after which it returns, even if no file descriptors are ready.

The registration function looks like this:

poll.register(file: impl Evented, token: Token, interest: Ready, pollopt: Pollopt)
// Ready is a bitmask with the following options: readable | writable | hup | aio | error | lio | pri
// Pollopt is a bitmask with the following options: level | edge | oneshot

When registering, it is possible to configure the type of interest we have:

  • should it trigger if it is ready for reading and/or writing and/or if an error occurred and/or if the other side has hang up?
  • should it trigger if there is data available or only if there is new data available?
  • should the interest be disabled if it has triggered once (oneshot)?

Each registered thing is associated with a Token, which is just a wrapper around usize. When poll.poll() returns, it tells us which tokens are now ready to proceed.

(poll.register() is thread-safe. There is also a poll.deregister() function and a poll.reregister() function. The latter is made for the onehot interest type.)

Evented is a trait that represents things that can become ready. There are exactly three base types that are Evented. Any other thing that implements Evented must forward the trait methods to one of the three base implementors of Evented.

  1. The mio::net TCP and UDP networking primitives. They can be used almost like the std::net primitives, except that every syscall is configured as nonblocking by default.
  2. A wrapper around any raw unix file descriptor.
  3. A type called Registration. This is kind of similar to a std::sync::mpsc::channel<()>(). In its inners, mio has a second event mechanism which is made not for operating system triggered events, but for events within the boundary of the application. Whenever you create a Registration, you also get a paired instance of SetReadiness. The Registration is given to poll.register(), together with a token. The SetReadyness instance is Send and Sync, so it can be moved freely around and stored for later use. Whenever any other part of the program calls set_readiness() on the SetReadiness instance, poll.poll() will return with the corresponding token.

It is possible to have multiple Poll instances where each instance waits for a different set of things to become ready. None the less it's not allowed to for a single Evented thing to switch between Poll instances, even if it deregister()s first.

Second layer of abstraction: The reactor

The reactor is a global, static resource. It consists of a system thread that calls poll.poll() in a loop {} and a way to register Eventeds with its inner Poll. It also replaces the Token mechanism of mio with the std::task::Waker mechanism from the futures context. But the reactor thread itself will never poll futures, it only polls mios Poll. It looks a bit like this:

static REACTOR: Reactor = ...;

struct Reactor {
    poller: mio::Poll,
    entries: Slab<Arc<Entry>>, // The Slab is a special array which allows the 
                               // indices to be used as the mio::Tokens
    ...
}

#[derive(Debug)]
struct Entry {
    token: mio::Token,
    /// Tasks that are blocked on reading from this I/O handle.
    readers: Mutex<Vec<Waker>>,
    /// Tasks that are blocked on writing to this I/O handle.
    writers: Mutex<Vec<Waker>>,
}

std::thread::spawn(|| {
    loop {
        // Block on the poller until at least one new event comes in.
        REACTOR.poller.poll(&mut events, None)?;

        for mioevent in events.iter() {
            let token = mioevent.token();

            if let Some(entry) = entries.get(token.0) {
                let readiness = mioevent.readiness();

                // Wake up reader tasks blocked on this I/O handle.
                if mioevent.readiness.is_read() {
                    for w in entry.readers.drain() { // drain means clearing the Vec in the process
                        w.wake();
                    }
                }

                // Wake up writer tasks blocked on this I/O handle.
                if mioevent.readiness.is_write() {
                    for w in entry.writers.drain() {
                        w.wake();
                    }
                }
            }
        }
    }
});

impl Reactor {
    fn register(&self, source: &dyn Evented) -> io::Result<Arc<Entry>> { ... }
    fn deregister(&self, source: &dyn Evented, entry: &Entry) -> io::Result<()>
}

Connecting to the futures

The REACTOR is static, but unfortunately not public. Therefore only types defined within async-std are able to register themselves in the reactor.

To solve this, and to connect the REACTOR with the futures, async-std provides the same set of networking primitives as std::net and mio::net.

There is another abstraction layer which I omit here: A wrapper called Watcher, that handles automatic registration on the REACTOR on creation, automatic deregistration when dropping as well as some polling helpers.

Let's look at UdpSocket as an example:

pub struct UdpSocket {
    entry: Arc<Entry>,
    miosocket: Option<mio::net::UdpSocket>,
}

impl UdpSocket {
    // new()
    pub fn bind(addr: ToSocketAddrs) -> io::Result<UdpSocket> {
        let miosocket = mio::net::UdpSocket(addr);
        Ok(UdpSocket{
            entry: REACTOR.register(&miosocket)?,
            miosocket: Some(miosocket)
        })
    }
}

On creation of a UdpSocket, it is immediately registered in the REACTOR and the reactor polls for read and write readiness by default. But there aren't any Wakers stored in the entry.readers and entry.writers yet. So even if an event triggers, it will not have an effect outside of the REACTOR.

All the async functions from UdpSocket return Futures (after removing the async sugar). If these futures are polled, UdpSocket gets its hands on a std::task::Context, from which the Waker is obtained. The Wakers are then put into udpsocket.entry.readers/writers, so that the corresponding futures are awoken when the REACTOR detects their readiness.

impl Future for the-return-value-of-UdpSocket.recv() {
    fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
        match self.socket.miosocket.recv(&mut self.outputbuf) { // nonblocking syscall.
            WOULDBLOCK => {
                // Connecting the EXECUTOR to the REACTOR
                self.socket.entry.readers.push(cx.waker());
                Poll::Pending
            },
            result => {
                Poll::Ready(Result)
            }
        }
    }
}

Since the reactor thread will always drain() the readers and writers lists, each future.poll() will only have a single waker.wake() as an effect.

All networking primitives in async_std::net will always register themselves with async_std's static but private REACTOR. The requirement of mio, that Evented types must not hop between Poll instances, is therefore guaranteed. Even if tokios reactor is running in the same application as well.

By the way: std::Waker uses a handmade vtable for performance and portability. So each Waker is only two pointers wide and the reactor don't need to know any implementation details of the executor part of the async machinery.

On the other side of the river: The executor

The executor is a collection of task queues together with some system threads which poll these tasks if they need to be polled. A task is a future augmented with some data needed for the executor.

Side note: tokios executor lives in an instance that the application must store somewhere. To spawn futures on the tokio executor, you need a handle to the executor. (tokios executor is (or was?) single threaded. I wonder if this is a strict requirement when using non Send futures.)

In contrast, the executor in async-std is static and global (but private) and can be accessed via the pub(crate) fn async_std::task::executor::schedule(runnable: Runnable) function. (Another side note: The public API to the executor is async_std::task::spawn(), but we will get to that in the next layer of abstraction.)

There are two types of queues in the async-std executor. The first type lives in a global static POOL and the second type lives in a thread local static PROCESSOR. There is a PROCESSOR for each executor thread. On the first call to schedule(), a handful of executor threads are started.

static POOL: Pool = ..;

struct Pool {
    /// The global queue of tasks.
    injector: Injector<Runnable>,
    /// Handles to local queues for stealing work from worker threads.
    stealers: Vec<Stealer<Runnable>>,
    /// Used for putting idle workers to sleep and notifying them when new runnables come in.
    sleepers: Sleepers,
}

thread_local! {
    static PROCESSOR: OnceCell<Processor> = OnceCell::new();
}

struct Processor {
    worker: Worker<Runnable>,
    slot: Cell<Option<Runnable>>, // Optimization, 
    slot_runs: Cell<u32>,
}

The executor uses a work stealing queue from crossbeam_deque. Thats were the types Injector, Stealer and Worker come from. schedule() puts new tasks into PROCESSOR.slot or PROCESSOR.worker if it is called on an executor thread or into POOL.injector if it is called from outside of an executor thread. PROCESSOR.slot is just an optimization to try to bypass the queue. This is how schedule() looks:

pub(crate) fn schedule(runnable: Runnable) {
    if PROCESSOR exists in this thread {
        if PROCESSOR.slot.is_none() {
            PROCESSOR.slot = Some(runnable);
        } else {
            PROCESSOR.worker.add(runnable);
        }
    } else {
        POOL.injector.add(runnable);
    }
    POOL.sleepers.notify_one();
}

Each executor thread tries to find and do work. If it can't find work, it steps back from consuming the CPU: First by yielding a few times, then by sleeping 1 µs and only then by calling a blocking system call.

std::thread::spawn(|| {
    loop {
        if let Some(runnable) = PROCESSOR.slot.get() { // but use the slot max 16 times in a row!
            runnable.run();
        } else if let Some(runnable) = PROCESSOR.from_thread_local_queue() {
            runnable.run();
        } else if let Some(runnable) = POOL.from_injector_queue() {
            runnable.run();
        } else if let Some(runnable) = POOL.stealers.from_some_other_PROCESSORs_local_queue() {
            runnable.run();
        } else {
            thread::yield_now(); // but maximal three times in a row
            thread::sleep(Duration::from_micros(10)); // then try sleeping once
            POOL.sleepers.wait(); // if yielding and sleeping does not help, go to sleep and wait for being woken up
        }
    }
});

The trick with the blocking call to POOL.sleepers.wait(): schedule() will always call POOL.sleepers.notify_one(). This has the effect of always waking up one executor thread, if they aren't all awake already. With the sleeper mechanism, idle executor threads do not consume CPU time, but schedule() can be sure that new Runnables will be noticed and worked on. Sleepers is implemented via std::sync::CondVar, which itself based on a special mutex.

At the end of this level of abstraction, lets look what Runnable does:

pub(crate) struct Runnable{ inner_task: async_task::Task<async_std::Task> };

Runnable is a wrapper around async_stds Tasks. It has only one method run(). Runnable adds abort-on-panic behavior and stores the currently running Task into a thread local static variable for debugging and logging purposes. Other than that, it just propagates a call to run() to its inner_task.run()

Level up: Tasks and Tasks

Side Note: Tasks can be named, but they are two allocations faster if they are unnamed.

There are two Tasks at play: async_task::Task and async_std::Task. The creators of async_std have made the crate async-task as an optimization helper to save some allocations when spawning futures on executors. Much unsafe and raw pointers in there. The async_task::Task provides memory for a tag, the boxed Future and the futures ::Output in a single allocation. It also provides the Waker and JoinHandle interfaces and handles partial drops of all the things like some kind of a fancy multi-Arc. (If I understood it correctly, even tokio could use async_task for optimizations.)

The main purpose of the Task abstraction is to have a JoinHandle that can be used to wait for a spawned task to finish and to retrieve the result of the finished task. The join handle can be dropped without stopping the associated task.

To spawn futures on the executor, async_std has one main API:

fn spawn(f: impl Future + Send + 'static) -> JoinHandle<f::Output> {
    // wrap future for logging on completion

    let task = async_std::task::Task::new(f);
    let task, join_handle = async_task::Task::new(task);
    schedule(Runnable::new(task));
    join_handle
}

Async timers

async_std does not use the timeout functionality of mio::Poll::poll(). Instead it uses the futures_timer crate. This crate maintains a static but private global list of all timers. It starts its own system thread with a purpose similar to the REACTOR thread. That thread blocks until a new timer is added or until the nearest timer expires. The thread archives blocking through the std::thread::park_timeout() and std::thread:unpark() functions. (I wonder why they don't use the timeout functionality of mio::Poll::poll(). Would that add synchronization overhead?)

Mixing async_std and tokio

The reactors of both crates are tightly coupled with each crates networking primitives. No mixing here.

The reactors only use the Waker API to talk to executors. Therefore, it does not hurt to execute tokio networking futures on the async_std executor and vice versa. (If the Send trait bound allows it.) Futures just wait to be woken up and execute themselves.

=> It is fine to mix. (But according to @yoshuawuyts, tokios executor should better be started explicitly.)

Final thoughts

That were the main building blocks of async-std async machinery. But there are many more appendices on top of that for many different purposes.

Anyway. Async is way more complicated than sync and it's just for the fucking performance.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.