(Phaiax - 2019/12/1 - CC_BY_SA 4.0)
Lately I was porting a software from tokio/futures-1.0 to async-await.
I somehow thought async-std
was the successor of tokio
and ported everything to async-std
.
80% in, I noticed that my hyper
dependency requires tokio
and that it's not possible to replace tokio
with async-std
without also replacing hyper
. Also, tokio
and async-std
try to solve the same problem. So I started a journey into the inners of the rust async story to find out if it is possible to use both tokio
and async-std
at the same time. (tl;dr: it is). I had heard of reactors and executors before, but there was much new stuff to discover.
I found it interesting, so I decided to write it down. The following is a dive into a mixture of async-std-1.0.1
and async-std-master
. I guess that tokio
does not differ that much in its architectural design. I will skip async-await sugar, Pin and futures. Disclaimer: There may be misconceptions in here.
Syscalls are operating system functions. A typical program calls these functions to read from and write to files and network sockets and to open outgoing network connections or accepting incoming network connections. (There are many other syscalls, but these ones are used more often than the others.)
Usually each of these syscalls pauses the calling thread until the operation is done. But it is possible to configure these syscalls to return immediately (with the flag EWOULDBLOCK) instead of blocking the thread. The programmer has to call the syscall again at a later time, to see if new data is available.
To avoid the need of constantly polling these syscalls, the operation system provides another syscall epoll
, which can be used to wait for any of a group of files or network connections to become ready. That means that epoll
is blocking, but on many files at once, not only on a single one like e.g. read
does. epoll
also tells the caller, which files are now ready to proceed.
Writing programs in a blocking way is very easy. Just start threads for everything that can block. The code is interrupted if it cannot proceed further and resumed if it can.
In contrast, async is hard. People did go to very great lengths, only to make async feel like blocking syscalls. If feels kind of insane and overly complex compared to the simplicity of blocking syscalls. And it is only to avoid the overhead of creating system threads. (The numbers say that it's worth it.)
(This is about mio 0.6
. Tokio has already pushed for mio 0.7
.)
mio
is a platform independent abstraction for epoll
with extras.
The central type of mio
is Poll
. The user of a poll: Poll
can register interest in the readiness of a few file descriptors and then call poll.poll()
which internally calls the epoll
syscall. When poll.poll()
returns, it tells which file descriptors are now ready to proceed. poll.poll()
also takes a timeout, after which it returns, even if no file descriptors are ready.
The registration function looks like this:
poll.register(file: impl Evented, token: Token, interest: Ready, pollopt: Pollopt)
// Ready is a bitmask with the following options: readable | writable | hup | aio | error | lio | pri
// Pollopt is a bitmask with the following options: level | edge | oneshot
When registering, it is possible to configure the type of interest we have:
- should it trigger if it is ready for reading and/or writing and/or if an error occurred and/or if the other side has hang up?
- should it trigger if there is data available or only if there is new data available?
- should the interest be disabled if it has triggered once (oneshot)?
Each registered thing is associated with a Token
, which is just a wrapper around usize
. When poll.poll()
returns, it tells us which tokens are now ready to proceed.
(poll.register()
is thread-safe. There is also a poll.deregister()
function and a poll.reregister()
function. The latter is made for the onehot
interest type.)
Evented
is a trait that represents things that can become ready. There are exactly three base types that are Evented
. Any other thing that implements Evented
must forward the trait methods to one of the three base implementors of Evented
.
- The
mio::net
TCP and UDP networking primitives. They can be used almost like thestd::net
primitives, except that every syscall is configured as nonblocking by default. - A wrapper around any raw unix file descriptor.
- A type called
Registration
. This is kind of similar to astd::sync::mpsc::channel<()>()
. In its inners,mio
has a second event mechanism which is made not for operating system triggered events, but for events within the boundary of the application. Whenever you create aRegistration
, you also get a paired instance ofSetReadiness
. TheRegistration
is given topoll.register()
, together with a token. TheSetReadyness
instance isSend
andSync
, so it can be moved freely around and stored for later use. Whenever any other part of the program callsset_readiness()
on theSetReadiness
instance,poll.poll()
will return with the corresponding token.
It is possible to have multiple Poll
instances where each instance waits for a different set of things to become ready.
None the less it's not allowed to for a single Evented
thing to switch between Poll
instances, even if it deregister()
s first.
The reactor is a global, static resource. It consists of a system thread that calls poll.poll()
in a loop {}
and a way to register Evented
s with its inner Poll
. It also replaces the Token
mechanism of mio with the std::task::Waker
mechanism from the futures context. But the reactor thread itself will never poll futures, it only polls mios Poll
. It looks a bit like this:
static REACTOR: Reactor = ...;
struct Reactor {
poller: mio::Poll,
entries: Slab<Arc<Entry>>, // The Slab is a special array which allows the
// indices to be used as the mio::Tokens
...
}
#[derive(Debug)]
struct Entry {
token: mio::Token,
/// Tasks that are blocked on reading from this I/O handle.
readers: Mutex<Vec<Waker>>,
/// Tasks that are blocked on writing to this I/O handle.
writers: Mutex<Vec<Waker>>,
}
std::thread::spawn(|| {
loop {
// Block on the poller until at least one new event comes in.
REACTOR.poller.poll(&mut events, None)?;
for mioevent in events.iter() {
let token = mioevent.token();
if let Some(entry) = entries.get(token.0) {
let readiness = mioevent.readiness();
// Wake up reader tasks blocked on this I/O handle.
if mioevent.readiness.is_read() {
for w in entry.readers.drain() { // drain means clearing the Vec in the process
w.wake();
}
}
// Wake up writer tasks blocked on this I/O handle.
if mioevent.readiness.is_write() {
for w in entry.writers.drain() {
w.wake();
}
}
}
}
}
});
impl Reactor {
fn register(&self, source: &dyn Evented) -> io::Result<Arc<Entry>> { ... }
fn deregister(&self, source: &dyn Evented, entry: &Entry) -> io::Result<()>
}
The REACTOR
is static, but unfortunately not public. Therefore only types defined within async-std
are able to register themselves in the reactor.
To solve this, and to connect the REACTOR
with the futures, async-std
provides the same set of networking primitives as std::net
and mio::net
.
There is another abstraction layer which I omit here: A wrapper called Watcher
, that handles automatic registration on the REACTOR
on creation, automatic deregistration when dropping as well as some polling helpers.
Let's look at UdpSocket
as an example:
pub struct UdpSocket {
entry: Arc<Entry>,
miosocket: Option<mio::net::UdpSocket>,
}
impl UdpSocket {
// new()
pub fn bind(addr: ToSocketAddrs) -> io::Result<UdpSocket> {
let miosocket = mio::net::UdpSocket(addr);
Ok(UdpSocket{
entry: REACTOR.register(&miosocket)?,
miosocket: Some(miosocket)
})
}
}
On creation of a UdpSocket
, it is immediately registered in the REACTOR
and the reactor polls for read and write readiness by default. But there aren't any Waker
s stored in the entry.readers
and entry.writers
yet. So even if an event triggers, it will not have an effect outside of the REACTOR.
All the async
functions from UdpSocket return Future
s (after removing the async sugar). If these futures are polled, UdpSocket
gets its hands on a std::task::Context
, from which the Waker
is obtained. The Waker
s are then put into udpsocket.entry.readers/writers
, so that the corresponding futures are awoken when the REACTOR
detects their readiness.
impl Future for the-return-value-of-UdpSocket.recv() {
fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
match self.socket.miosocket.recv(&mut self.outputbuf) { // nonblocking syscall.
WOULDBLOCK => {
// Connecting the EXECUTOR to the REACTOR
self.socket.entry.readers.push(cx.waker());
Poll::Pending
},
result => {
Poll::Ready(Result)
}
}
}
}
Since the reactor thread will always drain()
the readers
and writers
lists, each future.poll()
will only have a single waker.wake()
as an effect.
All networking primitives in async_std::net
will always register themselves with async_std
's static but private REACTOR
. The requirement of mio
, that Evented
types must not hop between Poll
instances, is therefore guaranteed. Even if tokio
s reactor is running in the same application as well.
By the way: std::Waker
uses a handmade vtable for performance and portability. So each Waker
is only two pointers wide and the reactor don't need to know any implementation details of the executor part of the async machinery.
The executor is a collection of task queues together with some system threads which poll these tasks if they need to be polled. A task is a future augmented with some data needed for the executor.
Side note: tokio
s executor lives in an instance that the application must store somewhere. To spawn futures on the tokio
executor, you need a handle to the executor. (tokio
s executor is (or was?) single threaded. I wonder if this is a strict requirement when using non Send
futures.)
In contrast, the executor in async-std
is static and global (but private) and can be accessed via the pub(crate) fn async_std::task::executor::schedule(runnable: Runnable)
function.
(Another side note: The public API to the executor is async_std::task::spawn(), but we will get to that in the next layer of abstraction.)
There are two types of queues in the async-std
executor. The first type lives in a global static POOL
and the second type lives in a thread local static PROCESSOR
. There is a PROCESSOR
for each executor thread. On the first call to schedule()
, a handful of executor threads are started.
static POOL: Pool = ..;
struct Pool {
/// The global queue of tasks.
injector: Injector<Runnable>,
/// Handles to local queues for stealing work from worker threads.
stealers: Vec<Stealer<Runnable>>,
/// Used for putting idle workers to sleep and notifying them when new runnables come in.
sleepers: Sleepers,
}
thread_local! {
static PROCESSOR: OnceCell<Processor> = OnceCell::new();
}
struct Processor {
worker: Worker<Runnable>,
slot: Cell<Option<Runnable>>, // Optimization,
slot_runs: Cell<u32>,
}
The executor uses a work stealing queue from crossbeam_deque
. Thats were the types Injector
, Stealer
and Worker
come from. schedule()
puts new tasks into PROCESSOR.slot
or PROCESSOR.worker
if it is called on an executor thread or into POOL.injector
if it is called from outside of an executor
thread. PROCESSOR.slot
is just an optimization to try to bypass the queue. This is how schedule()
looks:
pub(crate) fn schedule(runnable: Runnable) {
if PROCESSOR exists in this thread {
if PROCESSOR.slot.is_none() {
PROCESSOR.slot = Some(runnable);
} else {
PROCESSOR.worker.add(runnable);
}
} else {
POOL.injector.add(runnable);
}
POOL.sleepers.notify_one();
}
Each executor thread tries to find and do work. If it can't find work, it steps back from consuming the CPU: First by yielding a few times, then by sleeping 1 µs and only then by calling a blocking system call.
std::thread::spawn(|| {
loop {
if let Some(runnable) = PROCESSOR.slot.get() { // but use the slot max 16 times in a row!
runnable.run();
} else if let Some(runnable) = PROCESSOR.from_thread_local_queue() {
runnable.run();
} else if let Some(runnable) = POOL.from_injector_queue() {
runnable.run();
} else if let Some(runnable) = POOL.stealers.from_some_other_PROCESSORs_local_queue() {
runnable.run();
} else {
thread::yield_now(); // but maximal three times in a row
thread::sleep(Duration::from_micros(10)); // then try sleeping once
POOL.sleepers.wait(); // if yielding and sleeping does not help, go to sleep and wait for being woken up
}
}
});
The trick with the blocking call to POOL.sleepers.wait():
schedule()
will always call POOL.sleepers.notify_one()
. This has the effect of always waking up one executor thread, if they aren't all awake already. With the sleeper mechanism, idle executor threads do not consume CPU time, but schedule()
can be sure that new Runnable
s will be noticed and worked on. Sleepers
is implemented via std::sync::CondVar
, which itself based on a special mutex.
At the end of this level of abstraction, lets look what Runnable
does:
pub(crate) struct Runnable{ inner_task: async_task::Task<async_std::Task> };
Runnable
is a wrapper around async_std
s Task
s. It has only one method run()
. Runnable
adds abort-on-panic behavior and stores the currently running Task
into a thread local static variable for debugging and logging purposes. Other than that, it just propagates a call to run()
to its inner_task.run()
Side Note: Task
s can be named, but they are two allocations faster if they are unnamed.
There are two Task
s at play: async_task::Task
and async_std::Task
. The creators of async_std
have made the crate async-task
as an optimization helper to save some allocations when spawning futures on executors. Much unsafe and raw pointers in there. The async_task::Task
provides memory for a tag, the boxed Future
and the futures ::Output
in a single allocation. It also provides the Waker
and JoinHandle
interfaces and handles partial drops of all the things like some kind of a fancy multi-Arc
. (If I understood it correctly, even tokio
could use async_task
for optimizations.)
The main purpose of the Task
abstraction is to have a JoinHandle
that can be used to wait for a spawned task to finish and to retrieve the result of the finished task. The join handle can be dropped without stopping the associated task.
To spawn futures on the executor, async_std
has one main API:
fn spawn(f: impl Future + Send + 'static) -> JoinHandle<f::Output> {
// wrap future for logging on completion
let task = async_std::task::Task::new(f);
let task, join_handle = async_task::Task::new(task);
schedule(Runnable::new(task));
join_handle
}
async_std
does not use the timeout functionality of mio::Poll::poll()
. Instead it uses the futures_timer
crate. This crate maintains a static but private global list of all timers. It starts its own system thread with a purpose similar to the REACTOR
thread. That thread blocks until a new timer is added or until the nearest timer expires. The thread archives blocking through the std::thread::park_timeout()
and std::thread:unpark()
functions. (I wonder why they don't use the timeout functionality of mio::Poll::poll()
. Would that add synchronization overhead?)
The reactors of both crates are tightly coupled with each crates networking primitives. No mixing here.
The reactors only use the Waker
API to talk to executors. Therefore, it does not hurt to execute tokio
networking futures on the async_std
executor and vice versa. (If the Send
trait bound allows it.) Futures just wait to be woken up and execute themselves.
=> It is fine to mix. (But according to @yoshuawuyts, tokio
s executor should better be started explicitly.)
That were the main building blocks of async-std
async machinery. But there are many more appendices on top of that for many different purposes.
Anyway. Async is way more complicated than sync and it's just for the fucking performance.
from my test, async-std is slower than tokio