The Rust async working group is currently actively discusing on ways to improve async/await. Niko Matsakis documented the main goals and ideas in the async vision document.
As part of the improved async ecosytem, users should be able to make use of
scoped tasks, where tasks can hold references onto parent task data, as well
as should be able to wrap operations inside async fn
s which up to now haven't been
a great fit for those. Examples for the latter are are completion based OS APIs
(IOCP, io-uring) or async APIs that are implemented in different languages
(like C++ boost-asio callbacks).
These new APIs have requirements on async tasks running to completion - which means
dropping associated Future
s to cancel an active operation is no longer a valid
option.
Supporting cancellation is however still an important requirement. E.g. user
interfaces often support a Cancel
button which should timely cancel any ongoing operation when pressed,
and web-APIs (like e.g. executing a complex database query) should be stopped if the client disconnects.
In order to support those use-cases, graceful/cooperative cancellation models are proposed.
This document lists the proposed models, and showcases why CancellationToken
s seem
the most promising path for supporting graceful cancellation.
Let's start by looking at which models have been proposed so far:
Async Shiny Future Design Doc Sketches and Notes by Niko Matsakis describes a new Async
trait
(variant of Future
trait which guarantees running to completion).
This trait contains a request_cancellation()
function. Calling this funciton
initiates cancellation, while the regular .await
ing of the Async
would
be used to wait for the Async
to complete. The Async
might in this case either resolve to
some kind of CancellationError
, or yield any other result (e.g. in case the operation finished
before the cancellation was processed or if the Async
type does not support cancellation).
Such a Async
type could like like:
trait Async {
type Output;
/// # Unsafe conditions
///
/// * Once polled, cannot be moved
/// * Once polled, destructor must execute before memory is deallocated
/// * Once polled, must be polled to completion
unsafe fn poll(
&mut self,
context: &mut core::task::Context<'_>,
) -> core::task::Ready<Self::Output>;
/// Requests cancellation; poll must still be called.
fn request_cancellation(
&mut self
);
}
The completion crate written by Kai Jewsen
based on the original proposal to support run-to-completion async functions proposes the use of a poll_cancel()
function which is
used instead of the normal poll
function one an async operation is cancelled.
pub trait CompletionFuture {
/// The type of value produced on completion.
type Output;
/// Attempt to resolve the future to a final value, registering the current task for wakeup if
/// the value is not yet available.
unsafe fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>;
/// Attempt to cancel the future, registering the current task for wakeup if has not finished
/// cancelling yet.
unsafe fn poll_cancel(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()>;
}
tokio-uring propses an approach where Future
s add an explicit async fn cancel()
, which allows to
request stopping an ongoing operation and waiting for it to complete:
let accept = tcp_listener.accept();
tokio::select! {
(tcp_stream, addr) = &mut accept => { ... }
_ = tokio::time::sleep(Duration::from_millis(100)) => {
// Accept timed out, cancel the in-flight accept.
match accept.cancel().await {
Ok(_) => { ... } // operation canceled gracefully
Err(Completed(tcp_stream, addr)) => {
// The operation completed between the timeout
// and cancellation request.
}
Err(_) => { ... }
}
}
}
A CancellationToken
is an object which provides the ability to query whether
cancellation of a task is requested.
CancellationToken
s are passed along the call-chain.
Only areas of code which are interested to check if a task is cancelled are interacting with them.
Additionally, CancellationToken
s often provide the ability to register callbacks that
get executed once cancellation is requested.
CancellationToken
s are used to provide graceful cancellation support in lots of other
programming languages. The Appendix provides more details about how
other languages use CancellationToken
s or variants of those.
In order to better understand how the different cancellation mechanisms would work in a real application, we will look at a realistic use-case: We want to implement an asynchronous function which copies a file from a source location to a destination, where all reads and writes of file data are asynchronous - e.g. both files might be stored using a cloud-based object storage service.
This function might be part of a web-service, and should be cancelled if the user aborts the function.
A basic implementation could look like:
async fn copy_file(src_location: Uri, dest_location: Uri) -> Result<(), CopyError> {
// Create client objects for interacting with remote file storage.
// This will also connect to the remote locations
let src_stream = FileClient::open_read(src_location).await?;
let dst_stream = FileClient::open_write(dest_location).await?;
let mut buffer = [0u8; 64 * 1024];
loop {
let n = src_stream.read(&mut buffer[..]).await?;
if n == 0 {
// We copied all data
break;
}
dest_stream.write_all(&buffer[..n]).await?;
}
Ok(())
}
The read()
and write()
functions might be implemented using a completion-based
IO framework. Assuming we utilize io-uring
to build the client, the requirement
would be:
- Starting the operation is performed by queuing a
IORING_OP_READ
operation. - If the operation is cancelled, a
IORING_OP_ASYNC_CANCEL
operation needs to be submitted to the kernel. - Receiving a kernel notification that either the read or cancel operation has
finished, needs to unblock the
read()
operation.
The function can be called from a web API handler like:
async fn handle_request(request: Request) -> Result<Response, ApiError> {
let copy_file_request = CopyFileRequest::try_from(request)?;
copy_file(copy_file_request.src(), copy_file_request.dest()).await?.into()
}
In this case, the handle_request()
function would be cancelled by the webserver framework.
For option A), B) and C) this means the handle_request()
function would generate a Future
which
has some variant of a cancellation method. That method needs then to delegate cancellation to
interior Future
s like the one generated for FileClient::read
and FileClient::write
,
which then again have to submit the necessary cancellation operation on io-uring
and wait for the result.
For option D) the handle_request
function would carry an additional CancellationToken
,
either as an explicit parameter or as hidden/implicit parameter
(part of the async Context).
Now that we've defined the problem statement, let's look on how the various options would work out here:
This options would imply all of handle_request
, copy_file
, FileClient::{open_read, open_write, read, write}
would generate Future/Async
objects which contain a request_cancellation()
function.
For FileClient::read
this operation would be implemented by dispatching the IORING_OP_ASYNC_CANCEL
operation.
For copy_file
the request_cancellation
function would need to be compiler generated.
It should dispatch to the request_cancellation
of whichever Future
is active
at the moment.
impl CompilerGeneratedCopyFileFuture {
fn request_cancellation() {
match self.state { // This indicates the await point that had been reached
0 => return, // Not polled
1 => self.open_read_future.request_cancellation(), // Waiting on FileClient::open_read
2 => self.open_write_future.request_cancellation(), // Waiting on FileClient::open_write
3 => self.read_future.request_cancellation(), // Waiting on FileClient::read
4 => self.write_future.request_cancellation(), // Waiting on FileClient::write
_ => unreachable!(),
}
}
}
This look somewhat reasonable and doable on the first glance. However it requires
that we can reliably identify the current execution state of a Future
, and call
the respective method on the sub-Future
. We will observe later on why this
can be problematic.
The implementation of this method would look rather similar to Option A
- the
main difference is that polling the original Future
is stopped, and a cancel-path
is entered instead.
One immediately visible difference is that in this case the caller of a method would have no visible indication anymore whether the cancellation was successful and the method stopped executing early, or whether the cancellation did not have any impact
=> There exists no return value on poll_cancel()
which could indicate this.
Option C) does not immediately seem to offer a mechanism to perform the delegation of
a cancellation request from handle_request
down to the FileClient
operations.
async fn cancel()
on the io-uring primitves is manually implemented and not compiler generated.
It isi also not part of any interface/trait. Therefore even if it would be available on the low-level io-uring
APIs,
the cancellation capability would not be forwarded through the async fn
s copy_file
and handle_request
, which do not make use of hand-written Future
s.
This means while this variant might work for graceful cancellation deep within a particular framework, it won't compose very well in bigger applications. Another mechanism, like Option A), B) or D) would be required for this task.
Due to this limitation, we will continue the further comparison without Option C).
If we assume a CancellationToken
would be passed implicitely as part of Context
,
there would be no changes required for compiler generated Future
s. The token
would automatically be forwarded from handle_request
to copy_file
to
FileClient::{open_read, open_write, read, write}
.
Assuming the FileClient
SDK is built on something like tokio-uring
and delegates
to its read/write
operations, even the FileClient
SDK does not have to be modified.
The only methods that need to become cancellation aware are would be the tokio-uring
IO implementions.
A very primitive cancellation support in such an IO framework could look like:
impl IoHandle {
async fn read(&mut self, buffer: &mut [u8]) -> Result<usize, Error> {
// Return an `std::task::CancellationError` in case the task is cancelled,
// which is convertible into an `std::io::Error`
std::task::with_current_cancellation_token(|token| {
token.error_if_cancelled()
})?;
// Perform the actual IO implementation
self.inner.read(buffer).await
}
}
This means at the start of each IO operation, we check once if the current task is cancelled and return an error and skip the operation in this case.
This is trivial to implement, but comes with the downside of cancellation having no effect if the operation is already ongoing.
A better implementation should react immediately to cancellation requests, and thereby provide both lower latencies and higher assurance that cancellation actually happens.
As discussed earlier, for io-uring we would need to submit an explicit IORING_OP_ASYNC_CANCEL
operation to the OS to cancel an ongoing operation.
Typical CancellationToken
implementations allow this by letting users attach
callbacks that are executed immediately when cancellation is requested. They
are somewhat similar to Option A) fn request_cancellation()
, but are only
used in the parts of the code which require special cancellation handling.
impl IoHandle {
async fn read(&mut self, buffer: &mut [u8]) -> Result<usize, Error> {
// Return an `std::task::CancellationError` in case the task is cancelled,
// which is convertible into an `std::io::Error`
std::task::with_current_cancellation_token(|token| async {
let mut op = Mutex::new(UringOp::read(self.fd, buffer));
// Attach a callback that will be immediately executed when the operation
// is cancelled. The callback will be deregistered once the guard
// return from this function is dropped. It can therefore reference
// objects on the "async stack" and can be added to an intrusive list
// of cancellation callbacks - requiring no memory allocation.
// This is similar to C++ `stop_callback`s
// (https://en.cppreference.com/w/cpp/thread/stop_callback)
let _ = token.when_cancelled(|| {
let mut op = op.lock().unwrap();
if !op.is_active() {
return;
}
// Submits a cancel operation
op.cancel();
});
// Start the async operation
{
let mut op = op.lock().unwrap();
op.submit();
}
// Wait for the result. For simplification purposes we assume
// there is another async function that allows us to wait on this
// (e.g. through a oneshot channel, `notify()` handle or `ManualResetEvent`).
op.wait().await
let mut op = op.lock().unwrap();
return op.result()
}).await?;
}
}
This code snippet is simplified and a real implementation might at this place rather require a hand-written state-machine than an async function. However it should provide the basic idea how support for timely cancellation can be added using additional callbacks.
In case the token would not be provided via Context
, functions like handle_request
and copy_file
would have to be modified into the following format to become cancellation-aware:
async fn handle_request(
request: Request,
cancel_token: CancellationToken,
) -> Result<Response, ApiError> {
let copy_file_request = CopyFileRequest::try_from(request)?;
copy_file(copy_file_request.src(), copy_file_request.dest(), cancel_token).await?.into()
}
We will take a look at the pro's and con's of this approach in a follow-up article.
The main benefit of CancellationToken
s as shown is that they are extremely
composable. Most code either does not have to be cancellation-aware
(in case the token is implicitely forwarded), or does only have the responsibility
to forward the token. The more tricky parts of cancellation support have to be
only added at the IO library layer, which means the majority of people will not
have to actively worry about it. A typical application-author would only
- Having to handle
CancellationError
s if return from a cancelled function - Forward cancellation tokens manually, if not implicitely forwarded
- Call
CancellationToken::cancel()
or e.g.TaskHandle::cancel()
to trigger cancellation of subtasks.
Now that we have taken a look at how cancellation strategies integrate into the basic service implementation, we will try adding a few new requirements and see how those impact their performance:
Up to now we have built a copy_file
operation which unconditionally aborts as
soon as cancellation is initiated. This is great from a determinism point of view:
If users on the service abort transactins, they won't silently continue to run in
the background and consume resources there.
However there might be reasons for relaxing this requirement. Let's imagine we
are the owner of this service, and are learning at one day that one of our key
customers is not able to use the copy_file
API. The reason is that they have
configured their HTTP client timeout to 30s - and can not easily change it since
it's an embedded device and the source code for the device had been lost. However
over time the sizes of files they need to copy become larger, and now the copy takes
40s.
In order to help this customer, we want to slightly modify the copy_file
operation
and allow it to run to completion once at least 70% of the file have been copied.
When using CancellationToken
s - we can achieve this goal by overriding the
CancellationToken
passed to functions with one that never signals a cancellation
request and thereby breaks the automatic cancellation chain. This could look like:
async fn copy_file(src_location: Uri, dest_location: Uri) -> Result<(), CopyError> {
// Create client objects for interacting with remote file storage.
// This will also connect to the remote locations
let src_stream = FileClient::open_read(src_location).await?;
let dst_stream = FileClient::open_write(dest_location).await?;
let file_size = src_stream.len() as u64;
let mut buffer = [0u8; 64 * 1024];
let mut copied = 0u64;
// This part of the copy is cancellable
while (copied * 100) / file_size < 70 {
let n = src_stream.read(&mut buffer[..]).await?;
dest_stream.write_all(&buffer[..n]).await?;
copied += n as u64;
}
// Replaces the task local cancellation-token with one which never signals
// cancellation. Alternatively for expliti cancellation-token passing,
// pass an new token to functions instead of forwarding the existing one.
std::task::uncancellable(async {
while copied < file_size {
let n = src_stream.read(&mut buffer[..]).await?;
dest_stream.write_all(&buffer[..n]).await?;
copied += n as u64;
}
}).await;
Ok(())
}
The way to prevent the cancellation request being forwarded
seems to insert a custom hand-written Future/Async
in the call-chain where
request_cancellation
is a no-op.
E.g.
/// Wraps a `Future` in another `Future` which prevents cancellation requests
/// from flowing forward.
fn uncancellable<T>(inner: impl Future<Oputput=T>) -> Uncancellable(inner);
impl Uncancellable {
fn request_cancellation(&mut self) {}
}
The actual copy_file
method would then look equivalent to Option D).
The solution here seems similar to Option A), but with the poll_cancel()
function
of the Uncancellable
wrapper continuing to call the normal poll
function of
the wrapped Future
.
Up to now our service provided an API to copy a single file at a time. But why limit ourselves to that, when can offer users the ability to perform multiple copies at once? That will save latency of calling the copy API multiples times [1]?
[1] In reality the decision is not that easy, because the multi-copy API makes error handling more complicated and multiplexing protocols like HTTP/2 can offset latency gains - but it makes a good example.
In order to allow this, we modify the handle_request
API to spawn a subtask
per copy, and wait for all copies to complete. We are using here the scope/task_set
Api proposed by Carl Lerche in
Exploring ways to make async Rust easier. This API allows us run fully independent
tasks for each copy,
async fn handle_request(
request: Request,
cancel_token: CancellationToken,
) -> Result<Response, ApiError> {
let copy_file_request = CopyFileRequest::try_from(request)?;
std::task::scope(async |s| {
let mut task_set = scope.task_set();
// Start a subtask for each copy opereation
for (src, dest) in copy_file_request.files().iter() {
task_set.spawn(async {
copy_file(src, dest).await
});
}
// If one of the tasks finishes with an error, awaiting the `TaskSet` will
// yield the error and lead us to return from the scope and function
// due to the use of the `?` operator. This will cancel all other outstanding
// tasks.
while let Some(()) = task_set.next().await? {
// copying requires no result
}
}).await;
Ok(Response::ok())
}
Offering this API was super easy! But how to the various cancellation systems fit into it?
We will again by looking at the CancellationToken
based solution, since the
integration is most obvious:
We will use the ability to attach cancellation callbacks to the current CancellationToken
when
entering the scope, and cancel all tasks which are part of the scope by calling a .cancel()
function of their task handles. That .cancel()
function might either be a framework internal
function, or could be epart of the public TaskHandle
API. As a result of that cancellation request,
the CancellationToken
which is referenced inside the child tasks will transition into the
cancelled state.
The implementation would therefore look similar to:
async fn scope<F, R>(scope_fn: F) -> R
where F: AsyncFnOnce() -> R
{
let mut scope_state = ScopeState::new();
std::task::with_current_cancellation_token(|token| async {
let _ = token.when_cancelled(|| {
// When the current task is cancelled, send a cancellation request to
// all child tasks.
let mut scope_state = scope_state.lock().unwrap();
for child_task in scope_state.tasks() {
child_task.cancel();
}
});
let scope_handle = scope_state.handle();
// Call the provided function
let result = scope_fn().await;
// Cancel all remaining subtasks
let mut scope_state = scope_state.lock().unwrap();
for child_task in scope_state.tasks() {
child_task.cancel();
}
// And return the result
result
}).await?;
}
In this option, the scope
function would likely need to be a hand-written Future/Async
,
where the implementation forwards thee request_cancellation
call to all subtasks:
impl ScopeFuture {
fn request_cancellation(&mut self) {
let mut scope_state = scope_state.lock().unwrap();
for child_task in scope_state.tasks() {
child_task.request_cancellation();
}
}
}
While this seems reasonable, there exists a problem in this implementation as
soon as multi-threading gets involved. In this case the ScopeFuture::request_cancellation
method can not directly call CompilerGeneratedCopyFileFuture::request_cancellation
,
because that function runs potentially on a different thread. While the cancellation
of the scope is ongoing, the execution state of CompilerGeneratedCopyFileFuture
might move, and the currently active Future/Async
to which request_cancellation
needs to be dispatched will change. To reliably identify the sub-future to which
the cancellation call needs to be dispatched the execution of the subtasks needs
to be paused.
Options to do this are:
-
Insert additional synchronization points on each
.await
point (state transition) to allow reliable access to the currently active sub-Future
. This would be rather bad for performance, since now each.await
would come with a mandatory compiler-inserted lock. -
Queue the
request_cancellation
call onto the thread which actually executes the task. The actual call would happen later in the context of the task while it is guaranteed not to execute. Since cancellation is a binary state, this could likely be done by just setting a binary flag on the task state, queuing the task for execution, and inserting a cheimpl ScopeFuture { fn request_cancellation(&mut self) { let mut scope_state = scope_state.lock().unwrap(); for child_task in scope_state.tasks() { child_task.mark_cancelled(); child_task.waker().wake_by_ref(); } } } impl Task { unsafe fn poll( &mut self, context: &mut core::task::Context<'_>, ) -> core::task::Ready<Self::Output> { if self.marked_cancelled() { sel.future.request_cancellation(); } } }
This method is reasonable. However it requires the implementation of the scope to be aware about some executor/runtime details, or specifying additional APIs which allow for deferred cancellation.
The same multi-threading challenges as in Option A) would apply. However queuing
the cancellation request to happen later on inside the task context would not work:
The poll_cancel()
method needs to immediately perform work. Therefore it is not
obvious at the moment how the implementation would look like.
So far we assumed that all parts of our service can be naturally expressed using async functions and IO. This is however not always the case. Sometimes no async functions are available, and we have to fall back to regular synchronous functions.
Async runtimes (like tokio and async-std in Rust) commonly dispatch those operations to a threadpool to avoid preventing other async tasks from running while the potentially long-running thread-blocking operation is executed.
Tokio offers the spawn_blocking function for this purpose.
Now let's assume our service in some cases does not perform network file copy
operations, but actually reads or writes files from a local drive and we don't
have any async file IO API available => We definitely want to dispatch to
synchronous code using some kind of spawn_blocking
function.
impl FileClient {
pub async fn write_all(&mut self, buffer: &[u8]) -> Result<(), Error> {
// We assume here that `spawn_blocking` can borrow from the current scope,
// since our `async fn` is guaranteed to run to completion.
spawn_blocking(|| {
self.file.write_all(buffer)
})
}
}
This will definitely work!
But what about cancellation support? While at the first glance this looks rather hard to add any cancellation support for this function, there are definitely some options:
CancellationToken
s can be forwarded between an async and sync context by value
or reference. Therefore even the synchronous function could query if cancellation
is requested, and take associated actions. In the simplemost case where the blocking
API we are calling does not have any dedicated cancellation support we could still
at least try to check for cancellation from time to time:
impl FileClient {
pub async fn write_all(&mut self, buffer: &[u8]) -> Result<(), Error> {
// Get a reference to the current `CancellationToken`, which will then
// be accessed from the synchronous function.
std::task::with_current_cancellation_token(|token| {
spawn_blocking(|| {
let mut offset = 0;
while offset < buffer.len() {
// In every loop iteration we check if cancellation is requested.
// This returns a `CancellationError` which will be propagated
// back through all layers.
token.error_if_cancelled()?;
offset += self.file.write(&buffer[offset..])?
}
})
}).await?;
}
}
This can already improve the time until a cancellation request is handled.
It could however be possible to reduce time even further if the environment we
are working on offers a way to cancel thread-blocking functions (which is possible
by using OS signal mechanisms on some platforms). In that case we could have
synchronous functions that accept a CancellationToken
and unblock
if cancellation is requested:
impl FileClient {
pub async fn write_all(&mut self, buffer: &[u8]) -> Result<(), Error> {
std::task::with_current_cancellation_token(|token| {
spawn_blocking(|| {
// This synchronous file API accepts a `&CancellationToken`
// or `&dyn CancellationToken` as argument.
self.file.write_all(buffer, token)
})
}).await?;
}
}
Or we could go ahead even further, standardize a location in thread-local storage
where CancellationToken
references are stored, make the spawn_blocking
function
automatically propagate the reference, and the blocking write API implicitely accept this.
What would our code look like then? It simply gets reduced back to:
impl FileClient {
pub async fn write_all(&mut self, buffer: &[u8]) -> Result<(), Error> {
spawn_blocking(|| {
self.file.write_all(buffer)
})
}
}
That's neat! The cancellation context would be forwarded without any line of additional code. From here on we could go even further, and make sure that async code which gets launched from within synchronous code inherits its cancellation token, and get towards a full Async - Sync - Async Sandwich 🥪:
impl FileClient {
pub async fn write_all(&mut self, buffer: &[u8]) -> Result<(), Error> {
spawn_blocking(|| {
tokio::runtime::Builder().new_current_thread().build().block_on(async {
spawn_blocking(|| {
// Yes, even here the original `CancellationToken` is visible
self.file.write_all(buffer)
})
})
})
}
}
Given there is no directly visible CancellationToken
value or reference but only
a callback, there is nothing we can directly forward to the synchronous function.
We could set up some CancellationToken
like object which gets available for the
synchronous function and let request_cancellation
cancel this - but this seems
more like a mixture of of Option A) and B) than a pure variant of Option B).
This is again similar to Option A). However poll_cancel()
would be mostly implemented
by sending some kind of cancellation request once to the synchronous function, and then
by waiting for it to complete.
Now that we've looked at a variety of use-cases - what have we actually learned from them?
Here are some thoughts:
- The
CancellationToken
as well asrequest_cancellation
methods often look fairly similar in behavior - probably because theCancellationToken
also allows to attach arequest_cancellation
-like callback. - The
CancellationToken
approach requires less code to be touched to add graceful cancellation support. - The
request_cancellation
appraoch requires some extra thought on how to marshall the cancellation request into the executing task to prevent multi-threading issues. WithCancellationToken
s the requirement here is that users ofCancellationToken
s (who are going to attach a callback) have to make sure the cancellation path is thread-safe. That requirement might be relaxable in case there exists only a single-threaded runtime and cancellation requests are only issued within this runtime - but this might be very tricky to express in the type system. - Async/Sync sandwiches are very natural to express using
CancellationToken
s. Other mechanisms don't seem such a good fit for this use-case. CancellationToken
s could be even made available for the currentFuture
trait, and are not dependent on the introduction of a new trait.async fn cancel()
seems mostly suitable for single-layer cancellation, and does not compose well throughout multiple layers.poll_cancel()
seems like an interesting variant onrequest_cancellation
, but leaves a few more questions open. The method is probably also underrepresented in this article, and requires a bit more research for the best possible comparison.
Overall the CancellationToken
appraoch looks like the most promising option.
It can deal with all use-cases very well, it provides the means for using the
same tool for graceful cancellation of asynchronous and synchronous functions,
and its application is already very well understood due to widespread industry usage.
For using it in asynchronous Rust, some design choices would however still need to be figured out. We will look at them in more detail at another time, but here is a small list:
- Should
CancellationToken
s be passed explicitely or be implicitely available viaContext
? - Are
CancellationToken
sSend
orSync
? - Are
CancellationToken
sClone
? - Will users interact with a
struct CancellationToken
, or a&dyn CancellationToken
where the implementation is defined by runtimes? Or would it be even astruct CancellationToken
where the implementation is defined by runtimes - likestd::task::Waker
today? - What about memory allocations required for
CancellationToken
s? What about no-std? - How is synchronization handled for attaching and removing cancellation callbacks?
- Should there by one implicit
CancellationToken
be made available per task by runtimes (similar to Java/Kotlin), or should users explicitely create the required tokens? - And obviously the most important one: How would this mechanism really be called?
CancellationToken
StopToken
SomethingCompletelyDifferent
The CancellationToken
approach is used in the following other environments:
- C# / .NET Framework: The .NET runtime uses the CancellationToken API extensively.
Most asynchronous operations accept a
CancellationToken
as optional parameter which allows to cancel the operation. - Go provides the
Context
type, which represents a mixture of aCancellationToken
and a mechanism for task-local storage.Context
parameters are passed along the call-chain, so that code which is cancellation-aware is able to respond to cancellation requests. This happens by monitoring an embeddedDone
channel, which will become readable once cancellation is requested. - The C++ std::jthread type which
is part of C++20 represents a thread which joins on destruction.
In order to prevent hanging threads and maximize the chance of dropped threads joining as soon as possible, the jthread ecosystem introduces
std::stop_token
- which is equivalent to aCancellationToken
. Each jthread is associated with onestop_token
. This token can either explicitely be cancelled by callingjthread::request_stop
, or implicit cancellation will be requested if thejthread
handle is dropped before the thread run to completion. Thestop_token
is directly passed as an argument to the thread function, and can from there be forwarded to other function which support cancellation - e.g. astop-token
-awarecondition_variable
. It is interesting that C++ uses a mixture of the C#/Go vs Java/Kotlin styles: While code in C# and Go only ever explicitely interacts with aCancellationToken
type and fully decouples it from any particular thread/task instance, all of Java, Kotlin and C++ associate one particularCancellationToken
with an active thread/task. However while Java and Kotlin don't provide any direct reference to this token (only its methods are available via thread-local accessors), C++ passes the object around explicitely. - Kotlin does not have a public
CancellationToken
type. However it embeds some variant of aCancellationToken
inside itsJob
. EachJob
(which is equivalent to a Rust task) can be requested to be cancelled by calling acancel()
function on the job handle. This leads theJob
s state to change to cancelled. Code which is running inside theJob
can query the current state using aisActive()
property, or can get notified via a callback that is invoked once cancellation is triggered. Semantically that is pretty much equivalent to a uniqueCancellationToken
instance which is embedded inside theJob
state. - Java is similar to Kotlin - however it's cancellation state is per thread and not per task.
Java uses the
Thread::interrupt()
mechanism to requests threads to stop. Code inside the thread can useThread.interrupted()
to check for cancellation, and a lot of blocking IO functions are more directly integrated with the interruption mechanisms and will unblock and throw anInterruptedException
once interruption is requested. That again is conceptually similar to embedding a singleCancellationToken
inside the threads state.