Skip to content

Instantly share code, notes, and snippets.

@Matthias247
Last active January 17, 2022 22:50
Show Gist options
  • Star 2 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save Matthias247/354941ebcc4d2270d07ff0c6bf066c64 to your computer and use it in GitHub Desktop.
Save Matthias247/354941ebcc4d2270d07ff0c6bf066c64 to your computer and use it in GitHub Desktop.
A case for CancellationTokens

A case for CancellationTokens

Background

The Rust async working group is currently actively discusing on ways to improve async/await. Niko Matsakis documented the main goals and ideas in the async vision document.

As part of the improved async ecosytem, users should be able to make use of scoped tasks, where tasks can hold references onto parent task data, as well as should be able to wrap operations inside async fns which up to now haven't been a great fit for those. Examples for the latter are are completion based OS APIs (IOCP, io-uring) or async APIs that are implemented in different languages (like C++ boost-asio callbacks).

These new APIs have requirements on async tasks running to completion - which means dropping associated Futures to cancel an active operation is no longer a valid option.

Supporting cancellation is however still an important requirement. E.g. user interfaces often support a Cancel button which should timely cancel any ongoing operation when pressed, and web-APIs (like e.g. executing a complex database query) should be stopped if the client disconnects.

In order to support those use-cases, graceful/cooperative cancellation models are proposed.

This document lists the proposed models, and showcases why CancellationTokens seem the most promising path for supporting graceful cancellation.

Graceful cancellation models

Let's start by looking at which models have been proposed so far:

A) fn request_cancellation()

Async Shiny Future Design Doc Sketches and Notes by Niko Matsakis describes a new Async trait (variant of Future trait which guarantees running to completion).

This trait contains a request_cancellation() function. Calling this funciton initiates cancellation, while the regular .awaiting of the Async would be used to wait for the Async to complete. The Async might in this case either resolve to some kind of CancellationError, or yield any other result (e.g. in case the operation finished before the cancellation was processed or if the Async type does not support cancellation).

Such a Async type could like like:

trait Async {
    type Output;

    /// # Unsafe conditions
    ///
    /// * Once polled, cannot be moved
    /// * Once polled, destructor must execute before memory is deallocated
    /// * Once polled, must be polled to completion
    unsafe fn poll(
        &mut self,
        context: &mut core::task::Context<'_>,
    ) -> core::task::Ready<Self::Output>;

    /// Requests cancellation; poll must still be called.
    fn request_cancellation(
        &mut self
    );
}

B) fn poll_cancel()

The completion crate written by Kai Jewsen based on the original proposal to support run-to-completion async functions proposes the use of a poll_cancel() function which is used instead of the normal poll function one an async operation is cancelled.

pub trait CompletionFuture {
    /// The type of value produced on completion.
    type Output;

    /// Attempt to resolve the future to a final value, registering the current task for wakeup if
    /// the value is not yet available.
    unsafe fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>;

    /// Attempt to cancel the future, registering the current task for wakeup if has not finished
    /// cancelling yet.
    unsafe fn poll_cancel(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()>;
}

C) async fn cancel()

tokio-uring propses an approach where Futures add an explicit async fn cancel(), which allows to request stopping an ongoing operation and waiting for it to complete:

let accept = tcp_listener.accept();

tokio::select! {
    (tcp_stream, addr) = &mut accept => { ... }
    _ = tokio::time::sleep(Duration::from_millis(100)) => {
        // Accept timed out, cancel the in-flight accept.
        match accept.cancel().await {
            Ok(_) => { ... } // operation canceled gracefully
            Err(Completed(tcp_stream, addr)) => {
                // The operation completed between the timeout
                // and cancellation request.
            }
            Err(_) => { ... }
        }
    }
}

D) CancellationTokens

A CancellationToken is an object which provides the ability to query whether cancellation of a task is requested. CancellationTokens are passed along the call-chain. Only areas of code which are interested to check if a task is cancelled are interacting with them.

Additionally, CancellationTokens often provide the ability to register callbacks that get executed once cancellation is requested.

CancellationTokens are used to provide graceful cancellation support in lots of other programming languages. The Appendix provides more details about how other languages use CancellationTokens or variants of those.

The problem statement: Let's copy some files

In order to better understand how the different cancellation mechanisms would work in a real application, we will look at a realistic use-case: We want to implement an asynchronous function which copies a file from a source location to a destination, where all reads and writes of file data are asynchronous - e.g. both files might be stored using a cloud-based object storage service.

This function might be part of a web-service, and should be cancelled if the user aborts the function.

A basic implementation could look like:

async fn copy_file(src_location: Uri, dest_location: Uri) -> Result<(), CopyError> {
    // Create client objects for interacting with remote file storage.
    // This will also connect to the remote locations
    let src_stream = FileClient::open_read(src_location).await?;
    let dst_stream = FileClient::open_write(dest_location).await?;

    let mut buffer = [0u8; 64 * 1024];

    loop {
        let n = src_stream.read(&mut buffer[..]).await?;
        if n == 0 {
            // We copied all data
            break;
        }
        dest_stream.write_all(&buffer[..n]).await?;
    }

    Ok(())
}

Going deeper

The read() and write() functions might be implemented using a completion-based IO framework. Assuming we utilize io-uring to build the client, the requirement would be:

  • Starting the operation is performed by queuing a IORING_OP_READ operation.
  • If the operation is cancelled, a IORING_OP_ASYNC_CANCEL operation needs to be submitted to the kernel.
  • Receiving a kernel notification that either the read or cancel operation has finished, needs to unblock the read() operation.

And looking above

The function can be called from a web API handler like:

async fn handle_request(request: Request) -> Result<Response, ApiError> {
    let copy_file_request = CopyFileRequest::try_from(request)?;

    copy_file(copy_file_request.src(), copy_file_request.dest()).await?.into()
}

In this case, the handle_request() function would be cancelled by the webserver framework.

For option A), B) and C) this means the handle_request() function would generate a Future which has some variant of a cancellation method. That method needs then to delegate cancellation to interior Futures like the one generated for FileClient::read and FileClient::write, which then again have to submit the necessary cancellation operation on io-uring and wait for the result.

For option D) the handle_request function would carry an additional CancellationToken, either as an explicit parameter or as hidden/implicit parameter (part of the async Context).

Evaluating the options

Now that we've defined the problem statement, let's look on how the various options would work out here:

Option A) fn request_cancellation()

This options would imply all of handle_request, copy_file, FileClient::{open_read, open_write, read, write} would generate Future/Async objects which contain a request_cancellation() function.

For FileClient::read this operation would be implemented by dispatching the IORING_OP_ASYNC_CANCEL operation.

For copy_file the request_cancellation function would need to be compiler generated. It should dispatch to the request_cancellation of whichever Future is active at the moment.

impl CompilerGeneratedCopyFileFuture {
    fn request_cancellation() {
        match self.state { // This indicates the await point that had been reached
            0 => return, // Not polled
            1 => self.open_read_future.request_cancellation(), // Waiting on FileClient::open_read
            2 => self.open_write_future.request_cancellation(), // Waiting on FileClient::open_write
            3 => self.read_future.request_cancellation(), // Waiting on FileClient::read
            4 => self.write_future.request_cancellation(), // Waiting on FileClient::write
            _ => unreachable!(),
        }
    }
}

This look somewhat reasonable and doable on the first glance. However it requires that we can reliably identify the current execution state of a Future, and call the respective method on the sub-Future. We will observe later on why this can be problematic.

Option B) fn poll_cancel()

The implementation of this method would look rather similar to Option A - the main difference is that polling the original Future is stopped, and a cancel-path is entered instead.

One immediately visible difference is that in this case the caller of a method would have no visible indication anymore whether the cancellation was successful and the method stopped executing early, or whether the cancellation did not have any impact

=> There exists no return value on poll_cancel() which could indicate this.

Option C) async fn cancel()

Option C) does not immediately seem to offer a mechanism to perform the delegation of a cancellation request from handle_request down to the FileClient operations.

async fn cancel() on the io-uring primitves is manually implemented and not compiler generated. It isi also not part of any interface/trait. Therefore even if it would be available on the low-level io-uring APIs, the cancellation capability would not be forwarded through the async fns copy_file and handle_request, which do not make use of hand-written Futures.

This means while this variant might work for graceful cancellation deep within a particular framework, it won't compose very well in bigger applications. Another mechanism, like Option A), B) or D) would be required for this task.

Due to this limitation, we will continue the further comparison without Option C).

Option D) CancellationTokens

If we assume a CancellationToken would be passed implicitely as part of Context, there would be no changes required for compiler generated Futures. The token would automatically be forwarded from handle_request to copy_file to FileClient::{open_read, open_write, read, write}.

Assuming the FileClient SDK is built on something like tokio-uring and delegates to its read/write operations, even the FileClient SDK does not have to be modified.

The only methods that need to become cancellation aware are would be the tokio-uring IO implementions.

A very primitive cancellation support in such an IO framework could look like:

impl IoHandle {
    async fn read(&mut self, buffer: &mut [u8]) -> Result<usize, Error> {
        // Return an `std::task::CancellationError` in case the task is cancelled,
        // which is convertible into an `std::io::Error`
        std::task::with_current_cancellation_token(|token| {
            token.error_if_cancelled()
        })?;

        // Perform the actual IO implementation
        self.inner.read(buffer).await
    }
}

This means at the start of each IO operation, we check once if the current task is cancelled and return an error and skip the operation in this case.

This is trivial to implement, but comes with the downside of cancellation having no effect if the operation is already ongoing.

A better implementation should react immediately to cancellation requests, and thereby provide both lower latencies and higher assurance that cancellation actually happens.

As discussed earlier, for io-uring we would need to submit an explicit IORING_OP_ASYNC_CANCEL operation to the OS to cancel an ongoing operation.

Typical CancellationToken implementations allow this by letting users attach callbacks that are executed immediately when cancellation is requested. They are somewhat similar to Option A) fn request_cancellation(), but are only used in the parts of the code which require special cancellation handling.

impl IoHandle {
    async fn read(&mut self, buffer: &mut [u8]) -> Result<usize, Error> {
        // Return an `std::task::CancellationError` in case the task is cancelled,
        // which is convertible into an `std::io::Error`
        std::task::with_current_cancellation_token(|token| async {
            let mut op = Mutex::new(UringOp::read(self.fd, buffer));

            // Attach a callback that will be immediately executed when the operation
            // is cancelled. The callback will be deregistered once the guard
            // return from this function is dropped. It can therefore reference
            // objects on the "async stack" and can be added to an intrusive list
            // of cancellation callbacks - requiring no memory allocation.
            // This is similar to C++ `stop_callback`s
            // (https://en.cppreference.com/w/cpp/thread/stop_callback)
            let _ = token.when_cancelled(|| {
                let mut op = op.lock().unwrap();

                if !op.is_active() {
                    return;
                }

                // Submits a cancel operation
                op.cancel();
            });

            // Start the async operation
            {
                let mut op = op.lock().unwrap();
                op.submit();
            }

            // Wait for the result. For simplification purposes we assume
            // there is another async function that allows us to wait on this
            // (e.g. through a oneshot channel, `notify()` handle or `ManualResetEvent`).
            op.wait().await

            let mut op = op.lock().unwrap();
            return op.result()
        }).await?;
    }
}

This code snippet is simplified and a real implementation might at this place rather require a hand-written state-machine than an async function. However it should provide the basic idea how support for timely cancellation can be added using additional callbacks.

Explicit CancellationToken passing

In case the token would not be provided via Context, functions like handle_request and copy_file would have to be modified into the following format to become cancellation-aware:

async fn handle_request(
    request: Request,
    cancel_token: CancellationToken,
) -> Result<Response, ApiError> {
    let copy_file_request = CopyFileRequest::try_from(request)?;

    copy_file(copy_file_request.src(), copy_file_request.dest(), cancel_token).await?.into()
}

We will take a look at the pro's and con's of this approach in a follow-up article.

Benefits of the cancellation token

The main benefit of CancellationTokens as shown is that they are extremely composable. Most code either does not have to be cancellation-aware (in case the token is implicitely forwarded), or does only have the responsibility to forward the token. The more tricky parts of cancellation support have to be only added at the IO library layer, which means the majority of people will not have to actively worry about it. A typical application-author would only

  • Having to handle CancellationErrors if return from a cancelled function
  • Forward cancellation tokens manually, if not implicitely forwarded
  • Call CancellationToken::cancel() or e.g. TaskHandle::cancel() to trigger cancellation of subtasks.

Additional scenarios and use-cases

Now that we have taken a look at how cancellation strategies integrate into the basic service implementation, we will try adding a few new requirements and see how those impact their performance:

Allow the copy operation to run to completion

Up to now we have built a copy_file operation which unconditionally aborts as soon as cancellation is initiated. This is great from a determinism point of view: If users on the service abort transactins, they won't silently continue to run in the background and consume resources there.

However there might be reasons for relaxing this requirement. Let's imagine we are the owner of this service, and are learning at one day that one of our key customers is not able to use the copy_file API. The reason is that they have configured their HTTP client timeout to 30s - and can not easily change it since it's an embedded device and the source code for the device had been lost. However over time the sizes of files they need to copy become larger, and now the copy takes 40s.

In order to help this customer, we want to slightly modify the copy_file operation and allow it to run to completion once at least 70% of the file have been copied.

Option D) CancellationTokens

When using CancellationTokens - we can achieve this goal by overriding the CancellationToken passed to functions with one that never signals a cancellation request and thereby breaks the automatic cancellation chain. This could look like:

async fn copy_file(src_location: Uri, dest_location: Uri) -> Result<(), CopyError> {
    // Create client objects for interacting with remote file storage.
    // This will also connect to the remote locations
    let src_stream = FileClient::open_read(src_location).await?;
    let dst_stream = FileClient::open_write(dest_location).await?;

    let file_size = src_stream.len() as u64;
    let mut buffer = [0u8; 64 * 1024];
    let mut copied = 0u64;

    // This part of the copy is cancellable
    while (copied * 100) / file_size < 70 {
        let n = src_stream.read(&mut buffer[..]).await?;
        dest_stream.write_all(&buffer[..n]).await?;
        copied += n as u64;
    }

    // Replaces the task local cancellation-token with one which never signals
    // cancellation. Alternatively for expliti cancellation-token passing,
    // pass an new token to functions instead of forwarding the existing one.
    std::task::uncancellable(async {
        while copied < file_size {
            let n = src_stream.read(&mut buffer[..]).await?;
            dest_stream.write_all(&buffer[..n]).await?;
            copied += n as u64;
        }
    }).await;

    Ok(())
}

Option A) request_cancellation()

The way to prevent the cancellation request being forwarded seems to insert a custom hand-written Future/Async in the call-chain where request_cancellation is a no-op.

E.g.

/// Wraps a `Future` in another `Future` which prevents cancellation requests
/// from flowing forward.
fn uncancellable<T>(inner: impl Future<Oputput=T>) -> Uncancellable(inner);

impl Uncancellable {
    fn request_cancellation(&mut self) {}
}

The actual copy_file method would then look equivalent to Option D).

Option B) fn poll_cancel()

The solution here seems similar to Option A), but with the poll_cancel() function of the Uncancellable wrapper continuing to call the normal poll function of the wrapped Future.

Allow to perform multiple copies at once

Up to now our service provided an API to copy a single file at a time. But why limit ourselves to that, when can offer users the ability to perform multiple copies at once? That will save latency of calling the copy API multiples times [1]?

[1] In reality the decision is not that easy, because the multi-copy API makes error handling more complicated and multiplexing protocols like HTTP/2 can offset latency gains - but it makes a good example.

In order to allow this, we modify the handle_request API to spawn a subtask per copy, and wait for all copies to complete. We are using here the scope/task_set Api proposed by Carl Lerche in Exploring ways to make async Rust easier. This API allows us run fully independent tasks for each copy,

async fn handle_request(
    request: Request,
    cancel_token: CancellationToken,
) -> Result<Response, ApiError> {
    let copy_file_request = CopyFileRequest::try_from(request)?;

    std::task::scope(async |s| {
        let mut task_set = scope.task_set();

        // Start a subtask for each copy opereation
        for (src, dest) in copy_file_request.files().iter() {
            task_set.spawn(async {
                copy_file(src, dest).await
            });
        }

        // If one of the tasks finishes with an error, awaiting the `TaskSet` will
        // yield the error and lead us to return from the scope and function
        // due to the use of the `?` operator. This will cancel all other outstanding
        // tasks.
        while let Some(()) = task_set.next().await? {
            // copying requires no result
        }
    }).await;

    Ok(Response::ok())
}

Offering this API was super easy! But how to the various cancellation systems fit into it?

Option D) CancellationTokens

We will again by looking at the CancellationToken based solution, since the integration is most obvious:

We will use the ability to attach cancellation callbacks to the current CancellationToken when entering the scope, and cancel all tasks which are part of the scope by calling a .cancel() function of their task handles. That .cancel() function might either be a framework internal function, or could be epart of the public TaskHandle API. As a result of that cancellation request, the CancellationToken which is referenced inside the child tasks will transition into the cancelled state.

The implementation would therefore look similar to:

async fn scope<F, R>(scope_fn: F) -> R
where F: AsyncFnOnce() -> R
{
    let mut scope_state = ScopeState::new();

    std::task::with_current_cancellation_token(|token| async {
        let _ = token.when_cancelled(|| {
            // When the current task is cancelled, send a cancellation request to
            // all child tasks.
            let mut scope_state = scope_state.lock().unwrap();
            for child_task in scope_state.tasks() {
                child_task.cancel();
            }
        });

        let scope_handle = scope_state.handle();
        // Call the provided function
        let result = scope_fn().await;

        // Cancel all remaining subtasks
        let mut scope_state = scope_state.lock().unwrap();
        for child_task in scope_state.tasks() {
            child_task.cancel();
        }

        // And return the result
        result
    }).await?;
}

Option A) request_cancellation()

In this option, the scope function would likely need to be a hand-written Future/Async, where the implementation forwards thee request_cancellation call to all subtasks:

impl ScopeFuture {
    fn request_cancellation(&mut self) {
        let mut scope_state = scope_state.lock().unwrap();
        for child_task in scope_state.tasks() {
            child_task.request_cancellation();
        }
    }
}
Multithreading issues, here we come 🧵🧵🧵

While this seems reasonable, there exists a problem in this implementation as soon as multi-threading gets involved. In this case the ScopeFuture::request_cancellation method can not directly call CompilerGeneratedCopyFileFuture::request_cancellation, because that function runs potentially on a different thread. While the cancellation of the scope is ongoing, the execution state of CompilerGeneratedCopyFileFuture might move, and the currently active Future/Async to which request_cancellation needs to be dispatched will change. To reliably identify the sub-future to which the cancellation call needs to be dispatched the execution of the subtasks needs to be paused.

Options to do this are:

  1. Insert additional synchronization points on each .await point (state transition) to allow reliable access to the currently active sub-Future. This would be rather bad for performance, since now each .await would come with a mandatory compiler-inserted lock.

  2. Queue the request_cancellation call onto the thread which actually executes the task. The actual call would happen later in the context of the task while it is guaranteed not to execute. Since cancellation is a binary state, this could likely be done by just setting a binary flag on the task state, queuing the task for execution, and inserting a che

    impl ScopeFuture {
        fn request_cancellation(&mut self) {
            let mut scope_state = scope_state.lock().unwrap();
            for child_task in scope_state.tasks() {
                child_task.mark_cancelled();
                child_task.waker().wake_by_ref();
            }
        }
    }
    
    impl Task {
        unsafe fn poll(
                &mut self,
                context: &mut core::task::Context<'_>,
            ) -> core::task::Ready<Self::Output>
        {
            if self.marked_cancelled() {
                sel.future.request_cancellation();
            }
        }
    }

This method is reasonable. However it requires the implementation of the scope to be aware about some executor/runtime details, or specifying additional APIs which allow for deferred cancellation.

Option B) fn poll_cancel()

The same multi-threading challenges as in Option A) would apply. However queuing the cancellation request to happen later on inside the task context would not work: The poll_cancel() method needs to immediately perform work. Therefore it is not obvious at the moment how the implementation would look like.

Async - Sync - Async Sandwiches 🥪

So far we assumed that all parts of our service can be naturally expressed using async functions and IO. This is however not always the case. Sometimes no async functions are available, and we have to fall back to regular synchronous functions.

Async runtimes (like tokio and async-std in Rust) commonly dispatch those operations to a threadpool to avoid preventing other async tasks from running while the potentially long-running thread-blocking operation is executed.

Tokio offers the spawn_blocking function for this purpose.

Now let's assume our service in some cases does not perform network file copy operations, but actually reads or writes files from a local drive and we don't have any async file IO API available => We definitely want to dispatch to synchronous code using some kind of spawn_blocking function.

impl FileClient {
    pub async fn write_all(&mut self, buffer: &[u8]) -> Result<(), Error> {
        // We assume here that `spawn_blocking` can borrow from the current scope,
        // since our `async fn` is guaranteed to run to completion.
        spawn_blocking(|| {
            self.file.write_all(buffer)
        })
    }
}

This will definitely work!

But what about cancellation support? While at the first glance this looks rather hard to add any cancellation support for this function, there are definitely some options:

Option D) CancellationTokens

CancellationTokens can be forwarded between an async and sync context by value or reference. Therefore even the synchronous function could query if cancellation is requested, and take associated actions. In the simplemost case where the blocking API we are calling does not have any dedicated cancellation support we could still at least try to check for cancellation from time to time:

impl FileClient {
    pub async fn write_all(&mut self, buffer: &[u8]) -> Result<(), Error> {
        // Get a reference to the current `CancellationToken`, which will then
        // be accessed from the synchronous function.
        std::task::with_current_cancellation_token(|token| {
            spawn_blocking(|| {
                let mut offset = 0;
                while offset < buffer.len() {
                    // In every loop iteration we check if cancellation is requested.
                    // This returns a `CancellationError` which will be propagated
                    // back through all layers.
                    token.error_if_cancelled()?;
                    offset += self.file.write(&buffer[offset..])?
                }
            })
        }).await?;
    }
}

This can already improve the time until a cancellation request is handled.

It could however be possible to reduce time even further if the environment we are working on offers a way to cancel thread-blocking functions (which is possible by using OS signal mechanisms on some platforms). In that case we could have synchronous functions that accept a CancellationToken and unblock if cancellation is requested:

impl FileClient {
    pub async fn write_all(&mut self, buffer: &[u8]) -> Result<(), Error> {
        std::task::with_current_cancellation_token(|token| {
            spawn_blocking(|| {
                // This synchronous file API accepts a `&CancellationToken`
                // or `&dyn CancellationToken` as argument.
                self.file.write_all(buffer, token)
            })
        }).await?;
    }
}

Or we could go ahead even further, standardize a location in thread-local storage where CancellationToken references are stored, make the spawn_blocking function automatically propagate the reference, and the blocking write API implicitely accept this.

What would our code look like then? It simply gets reduced back to:

impl FileClient {
    pub async fn write_all(&mut self, buffer: &[u8]) -> Result<(), Error> {
        spawn_blocking(|| {
            self.file.write_all(buffer)
        })
    }
}

That's neat! The cancellation context would be forwarded without any line of additional code. From here on we could go even further, and make sure that async code which gets launched from within synchronous code inherits its cancellation token, and get towards a full Async - Sync - Async Sandwich 🥪:

impl FileClient {
    pub async fn write_all(&mut self, buffer: &[u8]) -> Result<(), Error> {
        spawn_blocking(|| {
            tokio::runtime::Builder().new_current_thread().build().block_on(async {
                spawn_blocking(|| {
                    // Yes, even here the original `CancellationToken` is visible
                    self.file.write_all(buffer)
                })
            })
        })
    }
}

Option A) request_cancellation()

Given there is no directly visible CancellationToken value or reference but only a callback, there is nothing we can directly forward to the synchronous function.

We could set up some CancellationToken like object which gets available for the synchronous function and let request_cancellation cancel this - but this seems more like a mixture of of Option A) and B) than a pure variant of Option B).

Option B) fn poll_cancel()

This is again similar to Option A). However poll_cancel() would be mostly implemented by sending some kind of cancellation request once to the synchronous function, and then by waiting for it to complete.

Summing up

Now that we've looked at a variety of use-cases - what have we actually learned from them?

Here are some thoughts:

  • The CancellationToken as well as request_cancellation methods often look fairly similar in behavior - probably because the CancellationToken also allows to attach a request_cancellation-like callback.
  • The CancellationToken approach requires less code to be touched to add graceful cancellation support.
  • The request_cancellation appraoch requires some extra thought on how to marshall the cancellation request into the executing task to prevent multi-threading issues. With CancellationTokens the requirement here is that users of CancellationTokens (who are going to attach a callback) have to make sure the cancellation path is thread-safe. That requirement might be relaxable in case there exists only a single-threaded runtime and cancellation requests are only issued within this runtime - but this might be very tricky to express in the type system.
  • Async/Sync sandwiches are very natural to express using CancellationTokens. Other mechanisms don't seem such a good fit for this use-case.
  • CancellationTokens could be even made available for the current Future trait, and are not dependent on the introduction of a new trait.
  • async fn cancel() seems mostly suitable for single-layer cancellation, and does not compose well throughout multiple layers.
  • poll_cancel() seems like an interesting variant on request_cancellation, but leaves a few more questions open. The method is probably also underrepresented in this article, and requires a bit more research for the best possible comparison.

Overall the CancellationToken appraoch looks like the most promising option. It can deal with all use-cases very well, it provides the means for using the same tool for graceful cancellation of asynchronous and synchronous functions, and its application is already very well understood due to widespread industry usage.

Open questions

For using it in asynchronous Rust, some design choices would however still need to be figured out. We will look at them in more detail at another time, but here is a small list:

  • Should CancellationTokens be passed explicitely or be implicitely available via Context?
  • Are CancellationTokens Send or Sync?
  • Are CancellationTokens Clone?
  • Will users interact with a struct CancellationToken, or a &dyn CancellationToken where the implementation is defined by runtimes? Or would it be even a struct CancellationToken where the implementation is defined by runtimes - like std::task::Waker today?
  • What about memory allocations required for CancellationTokens? What about no-std?
  • How is synchronization handled for attaching and removing cancellation callbacks?
  • Should there by one implicit CancellationToken be made available per task by runtimes (similar to Java/Kotlin), or should users explicitely create the required tokens?
  • And obviously the most important one: How would this mechanism really be called?
    • CancellationToken
    • StopToken
    • SomethingCompletelyDifferent

Appendix

CancellationTokens in other languages

The CancellationToken approach is used in the following other environments:

  • C# / .NET Framework: The .NET runtime uses the CancellationToken API extensively. Most asynchronous operations accept a CancellationToken as optional parameter which allows to cancel the operation.
  • Go provides the Context type, which represents a mixture of a CancellationToken and a mechanism for task-local storage. Context parameters are passed along the call-chain, so that code which is cancellation-aware is able to respond to cancellation requests. This happens by monitoring an embedded Done channel, which will become readable once cancellation is requested.
  • The C++ std::jthread type which is part of C++20 represents a thread which joins on destruction. In order to prevent hanging threads and maximize the chance of dropped threads joining as soon as possible, the jthread ecosystem introduces std::stop_token - which is equivalent to a CancellationToken. Each jthread is associated with one stop_token. This token can either explicitely be cancelled by calling jthread::request_stop, or implicit cancellation will be requested if the jthread handle is dropped before the thread run to completion. The stop_token is directly passed as an argument to the thread function, and can from there be forwarded to other function which support cancellation - e.g. a stop-token-aware condition_variable. It is interesting that C++ uses a mixture of the C#/Go vs Java/Kotlin styles: While code in C# and Go only ever explicitely interacts with a CancellationToken type and fully decouples it from any particular thread/task instance, all of Java, Kotlin and C++ associate one particular CancellationToken with an active thread/task. However while Java and Kotlin don't provide any direct reference to this token (only its methods are available via thread-local accessors), C++ passes the object around explicitely.
  • Kotlin does not have a public CancellationToken type. However it embeds some variant of a CancellationToken inside its Job. Each Job (which is equivalent to a Rust task) can be requested to be cancelled by calling a cancel() function on the job handle. This leads the Jobs state to change to cancelled. Code which is running inside the Job can query the current state using a isActive() property, or can get notified via a callback that is invoked once cancellation is triggered. Semantically that is pretty much equivalent to a unique CancellationToken instance which is embedded inside the Job state.
  • Java is similar to Kotlin - however it's cancellation state is per thread and not per task. Java uses the Thread::interrupt() mechanism to requests threads to stop. Code inside the thread can use Thread.interrupted() to check for cancellation, and a lot of blocking IO functions are more directly integrated with the interruption mechanisms and will unblock and throw an InterruptedException once interruption is requested. That again is conceptually similar to embedding a single CancellationToken inside the threads state.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment