Dart provides ways to structure complex long running asynchronous computations using Future
and Stream
APIs as well as async/await
syntactic sugar. Future
and Stream
objects are related in a sense that both represent a communication channel between a consumer (or consumers) an an asynchronous producer of data. Future
is a box for a single asynchronously produced value, while Stream
is a sequence of zero or more asynchronously produced values.
Despite similarities there are two important differences between Future
and Stream
:
Stream
has built-in support for cancellation (seeStreamSubscription.cancel
) allowing subscriber to stop listening. Importantly producer (e.g.StreamController
) is notified when consumer cancels, which allows it to stop whatever asynchronous process and potentially cancel other subscriptions recursively.- Streams don't provide delivery guarantee, a value added to the stream might be lost if subscriber decides to cancel its subscription.
Stream
subscription cancellation allows consumer to notify producer that it no longer needs to produce values. Future
lacks similar functionality meaning that asynchronous computations which use Future
to communicate their results always run to completion.
It's a commonly requested feature to extend Future
and async
functions with cancellation support: to allow canceling pending asynchronous work. One common use case is to allow canceling heavy business logic (e.g. API requests and complicated decoding) when user navigates away from the UI that needs results of this logic. Currently the only option we offer in package:async
is to wrap business logic in a CancellableOperaton
which when canceled simply swallows the value produced by the wrapped async operation. CancellableOperation
does not actually attempt to interrupt the asynchronous operation it wraps or cancel other asynchronous operation that it depends on. An evolution of CancellableOperation
is CancellableZone
which attempts to stop the propagation of values produced by nested asynchronous operations by submerging the whole computation into a custom Zone
and intercepting listener callbacks.
Among reviewed languages only Kotlin provides cancellation support which is end-to-end integrated into other asynchronous primitives.
No support for cancellation in std::promise
/std::future
API or coroutine syntax.
JavaScript Promise
does not support cancellation. There was a proposal to introduce cancellation into the language but it was abandoned when its champion became inactive in TC-39. Archived proposal can be viewed here.
Task
represents a unit of asynchronous work in Swift. Task
supports cooperative cancellation:
Task.cancel()
can mark the task as canceledTask.isCancelled
andTask.checkCancellation()
allow checking if the currently running task is canceled.
There is no automatic integration between asynchronous functions and task cancellation, instead developer has to manually check for cancellation or wrap computation into Task.withTaskCancellationHandler
.
Similar to Swift: cooperative task cancellation through CancellationToken
.
Kotlin's coroutines provide first party support for cooperative cancellation:
- cancel methods allows to cancel a
Job
orCoroutineScope
and all its children with a given cancellationcause
. CoroutineScope.isActive
allows to check for cancellation explicitly
What differentiates Kotlin from Swift and C# is that all suspending functions from kotlinx.coroutines
are cancellable: they throw CancellationException
when canceled. Consider for example Promise<T>.await
which contains the following text in its documentation:
This suspending function is cancellable. If the
Job
of the current coroutine is cancelled or completed while this suspending function is waiting, this function stops waiting for the promise and immediately resumes withCancellationException
. There is a prompt cancellation guarantee. If the job was cancelled while this function was suspended, it will not resume successfully.
This effectively means that as long as you structure your code using kotlinx.coroutines
primitives (which includes Defered
and Flow
- which are analogues of Future
and Stream
in Dart) your asynchronous code will be recursively cancellable.
Any proposed design must maintain the following property: if an asynchronous computation produces a value then this value must reach the consumer. Concretely we want to avoid simply dropping any objects to the floor because they might, for example, have cleanup actions associated with them.
We want to ensure that both synchronous and asynchronous versions of the code below provide the same guarantees to the developer.
Resource acquire();
void consume() {
final r = acquire();
r.release();
}
Future<Resource> asyncAcquire();
void asyncConsume() async {
final r = await asyncAcquire();
r.release();
}
Additionally we would like to restrict usage of this feature on this initial stages, so will put the implementation under the experimental flag cancellable-futures
.
Design below has similarities to Lasse's original sketch but differs in some aspects (e.g. we avoid changing Future
interface)
We propose to extend dart:async
with CancellableFuture
interface and make all built-in Future
implementations implement it.
// NOTE: This is an experimental feature and might be removed in future releases.
abstract interface class CancellableFuture {
/// Indicate that the value of this future is no longer necessary.
///
/// If cancellation is successful this [Future] will eventually
/// compelete.
///
/// This will attempt to cancel the underlying computation
/// but the attempt is not guarateed to succeed and the Future
/// might still complete successfully.
///
/// This throws [UnsupportedError] if cancellable-futures is not enabled.
void tryCancel({Object reason = const CancelException()});
}
// NOTE: This is an experimental feature and might be removed in future releases.
abstract class CancelException implements Exception {
const CancelException([dynamic message]);
}
void _defaultOnCancel(Completer completer, {required Object reason}) {
completer.completeError(reason);
}
abstract interface class Completer<T> {
// NOTE: This is an experimental feature and might be removed in future releases.
factory Completer.cancellable({
required void Function(Completer, {required Object reason}) onCancel = _defaultOnCancel,
});
}
When CancellableFuture.tryCancel(reason)
is called the following happens:
- If experiment
cancellable-futures
is not enabled then throwUnsupportedError('this feature requires cancellable-futures experiment')
. - If this
CancellableFuture
has been already completed then nothing happens. - If this
CancellableFuture
was produced from anotherCancellableFuture
(e.g. throughthen
) then we recursively invokecancel(reason)
on thoseFuture
's. - If
CancellableFuture
is produced from aStream
(e.g. result ofStream.toList()
) then canceling this future cancels the underlying subscription. - If
CancellableFuture
was produced by aCompleter.cancellable
then we invokeonCancel
callback passing theCompleter
object and the givenreason
.onCancel
can then decide how to proceed. The default implementation would simply complete theCompleter
with the givenreason
error. - If this
CancellableFuture
was produced by anasync
function then we mark the underlying computation as canceled, which has the following effect:- If
async
function is suspended at anawait f
statement andf is CancellableFuture
, we will attempt to invokef.tryCancel(reason)
. - If
async
function is resumed and reaches anawait f
statement, then before suspending we will invokef.tryCancel(reason)
iff is CancellableFuture
. - For simplicity, we assume that
await for
is desugared in terms ofawait
so we don't describe its semantics here. Fundamentally, cancelingasync
computation which is processing theStream
should cancel the subscription to the stream. - Note that canceled
async
computation might still complete successfully if it reachesreturn
.
- If
For the purposes of interoperability between Stream
cancellation and Future
cancellation we might consider changing the semantics of async*
in the following way:
- When subscription to the
async*
generator is canceled and the correspondingasync*
generator is suspended onawait f
wheref is CancellableFuture
then we invokef.tryCancel()
.- Caveat: There is no way of getting cancellation reason here because stream subscription cancellation does provide one.
Future.timeout()
will be changed to cancel the underlying computation when timeout if reached.
We should review all APIs (most importantly dart:io
and dart:isolate
and incorporate cancellation handling into them). For example, Isolate.run
should likely propagate cancellation request from the awaiting isolate to the isolate executing the operation.
Code written using Completer.cancellable
can check for cancellation by checking Completer.isCompleted
, but async
functions don't have direct access to the underlying Completer
.
We propose the following API to allow access to the completion state:
// dart:async
abstract class CancellationToken {
/// Get [CancellationToken] for the surrounding `async` function.
///
/// Throws [UnsupportedError] if not invoked directly from `async` function.
static CancellationToken get current;
bool get isCancelled;
/// This future completes if the [CancellationToken] is marked as cancelled.
Future<void> asFuture();
}
This API can can be used like so:
Future<void> foo() async {
final token = CancellationToken.current; // OK
longRunning(..., token);
void cb() {
CancellationToken.current.isCancelled; // UnsupportedError
}
}
void longRunning(CancellationToken token) {
if (token.isCancelled) {
// ...
}
token.asFuture().then((_) => something.cancel());
}
An alternative approach could be to come up with a special syntax which would only make sense inside async
functions - but it is unclear if it is worth introducing special syntax for such a fairly specialized use case.
- Propagating cancellation through the chain of
Future
objects requires linking them in both directions: currently if you dof2 = f1.then((v) => ...)
f2
does not actually remember that it is waiting forf1
to complete, insteadf1
knows that it needs to propagate result tof2
. This will introduce additional memory overhead. - There will be some additional runtime overhead at
await
statements - though it is likely going to be negligible.
One of the main concerns around Future
cancellation in general is its behavior in the presence of multiple Future
listeners.
Future<T> f = computation();
var f1 = f.then(handleValueOneWay);
var f2 = f.then(handleValueAnotherWay);
(f1 as CancellableFuture).tryCancel();
The proposed behavior for CancellableFuture
would be to cancel the underlying computation by canceling f
ignoring the fact that f
has other listeners. This would lead both f1
and f2
to complete with CancelException
error.
We can choose between
- Do not cancel
f
at all if it has multiple listeners. - Cancel if all listeners cancel.
- Cancel if any listener cancels (proposed behavior).
- Throw an error when trying to cancel a
Future
with more than one listener. - Make each
Completer
aware about the number of active listeners and let it decide whether underlying computation should be canceled.
It is worth noting that proposed behavior is flexible enough to allows users to build their own BroadcastingFuture
which only cancels the computation if all of its listeners canceled:
class BroadcastingFuture<T> implements CancellableFuture {
final Future<T> inner;
int activeListeners = 0;
@override
Future<R> then<R>(FutureOr<R> onValue(T value), {Function? onError}) {
final result = Completer<R>.cancellable(tryCancel: (reason) {
if (--activeListeners == 0) {
inner.tryCancel(reason);
}
return true;
});
inner.then((v) {
if (!v.isCancelled) {
result.complete(onValue(v));
}
});
activeListeners++;
return result.future;
}
}
final f = BroadcastingFuture<T>(computation());
var f1 = f.then(handleValueOneWay);
var f2 = f.then(handleValueAnotherWay);
f1.tryCancel();
- Backends (e.g. JS or Wasm) are free to choose to not support this feature at all and simply produce
Future
objects which don't implement the marker interface.
None at the moment.
- Fixed description of
CancellableOperation
(it does not useZone
s), added link toCancellableZone
POC from @lrhn. - Added Concerns section and documented common concern around canceling
Future
with multiple listeners. - Added section introducing a way to access cancellation state of a running
async
function. - Documented that we also want to integrate existing code with cancellation:
Future.timeout
will cancel underlying computation when timeout is reached.Isolate.run
should propagate cancellation request from calling isolate to the isolate which is running the computation.
- API changes
CancelledOperation
is notCancelException
(which implementsException
).tryCancel
takesreason
and propagates it toCompleter
onCancel
callback.Completer.cancellable
:onCancel
callback (previouslytryCancel
) now getsCompleter
object itself and the cancellation reason object (an instance ofCancelException
by default). It can choose to complete normally or throw some error. Default implementation simply completes with the given error, but developers using this API are free to decide if they want some other behavior.- Removed
Completer.isCancelled
- users can simply checkCompleter.isCompleted
. - Removed
dropValue
callback - most likely not worth it. Users should simply take care of checkingCompleter.isCompleted
manually and handling this situation properly if they useCompleter.cancellable
.
[nit] There is no
Zone
involved in the implementation ofCancellableOperation
.