Skip to content

Instantly share code, notes, and snippets.

@mraleph
Last active July 17, 2024 10:00
Show Gist options
  • Save mraleph/6daf658c95be249c2f3cbf186a4205b9 to your computer and use it in GitHub Desktop.
Save mraleph/6daf658c95be249c2f3cbf186a4205b9 to your computer and use it in GitHub Desktop.

Cancellable Future

Dart provides ways to structure complex long running asynchronous computations using Future and Stream APIs as well as async/await syntactic sugar. Future and Stream objects are related in a sense that both represent a communication channel between a consumer (or consumers) an an asynchronous producer of data. Future is a box for a single asynchronously produced value, while Stream is a sequence of zero or more asynchronously produced values.

Despite similarities there are two important differences between Future and Stream:

  • Stream has built-in support for cancellation (see StreamSubscription.cancel) allowing subscriber to stop listening. Importantly producer (e.g. StreamController) is notified when consumer cancels, which allows it to stop whatever asynchronous process and potentially cancel other subscriptions recursively.
  • Streams don't provide delivery guarantee, a value added to the stream might be lost if subscriber decides to cancel its subscription.

Stream subscription cancellation allows consumer to notify producer that it no longer needs to produce values. Future lacks similar functionality meaning that asynchronous computations which use Future to communicate their results always run to completion.

It's a commonly requested feature to extend Future and async functions with cancellation support: to allow canceling pending asynchronous work. One common use case is to allow canceling heavy business logic (e.g. API requests and complicated decoding) when user navigates away from the UI that needs results of this logic. Currently the only option we offer in package:async is to wrap business logic in a CancellableOperaton which when canceled simply swallows the value produced by the wrapped async operation. CancellableOperation does not actually attempt to interrupt the asynchronous operation it wraps or cancel other asynchronous operation that it depends on. An evolution of CancellableOperation is CancellableZone which attempts to stop the propagation of values produced by nested asynchronous operations by submerging the whole computation into a custom Zone and intercepting listener callbacks.

Other programming languages

Among reviewed languages only Kotlin provides cancellation support which is end-to-end integrated into other asynchronous primitives.

C++

No support for cancellation in std::promise/std::future API or coroutine syntax.

JavaScript

JavaScript Promise does not support cancellation. There was a proposal to introduce cancellation into the language but it was abandoned when its champion became inactive in TC-39. Archived proposal can be viewed here.

Swift

Task represents a unit of asynchronous work in Swift. Task supports cooperative cancellation:

There is no automatic integration between asynchronous functions and task cancellation, instead developer has to manually check for cancellation or wrap computation into Task.withTaskCancellationHandler.

C#

Similar to Swift: cooperative task cancellation through CancellationToken.

Kotlin

Kotlin's coroutines provide first party support for cooperative cancellation:

  • cancel methods allows to cancel a Job or CoroutineScope and all its children with a given cancellation cause.
  • CoroutineScope.isActive allows to check for cancellation explicitly

What differentiates Kotlin from Swift and C# is that all suspending functions from kotlinx.coroutines are cancellable: they throw CancellationException when canceled. Consider for example Promise<T>.await which contains the following text in its documentation:

This suspending function is cancellable. If the Job of the current coroutine is cancelled or completed while this suspending function is waiting, this function stops waiting for the promise and immediately resumes with CancellationException. There is a prompt cancellation guarantee. If the job was cancelled while this function was suspended, it will not resume successfully.

This effectively means that as long as you structure your code using kotlinx.coroutines primitives (which includes Defered and Flow - which are analogues of Future and Stream in Dart) your asynchronous code will be recursively cancellable.

Design Requirements

Any proposed design must maintain the following property: if an asynchronous computation produces a value then this value must reach the consumer. Concretely we want to avoid simply dropping any objects to the floor because they might, for example, have cleanup actions associated with them.

We want to ensure that both synchronous and asynchronous versions of the code below provide the same guarantees to the developer.

Resource acquire();
void consume() {
  final r = acquire();
  r.release();
}

Future<Resource> asyncAcquire();
void asyncConsume() async {
  final r = await asyncAcquire();
  r.release();
}

Additionally we would like to restrict usage of this feature on this initial stages, so will put the implementation under the experimental flag cancellable-futures.

Proposed Design

Design below has similarities to Lasse's original sketch but differs in some aspects (e.g. we avoid changing Future interface)

We propose to extend dart:async with CancellableFuture interface and make all built-in Future implementations implement it.

// NOTE: This is an experimental feature and might be removed in future releases.
abstract interface class CancellableFuture {
  /// Indicate that the value of this future is no longer necessary.
  /// 
  /// If cancellation is successful this [Future] will eventually
  /// compelete.
  ///  
  /// This will attempt to cancel the underlying computation 
  /// but the attempt is not guarateed to succeed and the Future
  /// might still complete successfully.
  /// 
  /// This throws [UnsupportedError] if cancellable-futures is not enabled.
  void tryCancel({Object reason = const CancelException()});
}

// NOTE: This is an experimental feature and might be removed in future releases.
abstract class CancelException implements Exception {
  const CancelException([dynamic message]);
}

void _defaultOnCancel(Completer completer, {required Object reason}) {
  completer.completeError(reason);
}

abstract interface class Completer<T> {
  // NOTE: This is an experimental feature and might be removed in future releases.
  factory Completer.cancellable({
      required void Function(Completer, {required Object reason}) onCancel = _defaultOnCancel,
  });
}

When CancellableFuture.tryCancel(reason) is called the following happens:

  • If experiment cancellable-futures is not enabled then throw UnsupportedError('this feature requires cancellable-futures experiment').
  • If this CancellableFuture has been already completed then nothing happens.
  • If this CancellableFuture was produced from another CancellableFuture (e.g. through then) then we recursively invoke cancel(reason) on those Future's.
  • If CancellableFuture is produced from a Stream (e.g. result of Stream.toList()) then canceling this future cancels the underlying subscription.
  • If CancellableFuture was produced by a Completer.cancellable then we invoke onCancel callback passing the Completer object and the given reason. onCancel can then decide how to proceed. The default implementation would simply complete the Completer with the given reason error.
  • If this CancellableFuture was produced by an async function then we mark the underlying computation as canceled, which has the following effect:
    • If async function is suspended at an await f statement and f is CancellableFuture , we will attempt to invoke f.tryCancel(reason).
    • If async function is resumed and reaches an await f statement, then before suspending we will invoke f.tryCancel(reason) if f is CancellableFuture.
    • For simplicity, we assume that await for is desugared in terms of await so we don't describe its semantics here. Fundamentally, canceling async computation which is processing the Stream should cancel the subscription to the stream.
    • Note that canceled async computation might still complete successfully if it reaches return.

For the purposes of interoperability between Stream cancellation and Future cancellation we might consider changing the semantics of async* in the following way:

  • When subscription to the async* generator is canceled and the corresponding async* generator is suspended on await f where f is CancellableFuture then we invoke f.tryCancel().
    • Caveat: There is no way of getting cancellation reason here because stream subscription cancellation does provide one.

Future.timeout() will be changed to cancel the underlying computation when timeout if reached.

We should review all APIs (most importantly dart:io and dart:isolate and incorporate cancellation handling into them). For example, Isolate.run should likely propagate cancellation request from the awaiting isolate to the isolate executing the operation.

Checking for cancellation in async functions

Code written using Completer.cancellable can check for cancellation by checking Completer.isCompleted, but async functions don't have direct access to the underlying Completer.

We propose the following API to allow access to the completion state:

// dart:async

abstract class CancellationToken {
  /// Get [CancellationToken] for the surrounding `async` function.
  ///
  /// Throws [UnsupportedError] if not invoked directly from `async` function.
  static CancellationToken get current;

  bool get isCancelled;
  
  /// This future completes if the [CancellationToken] is marked as cancelled.
  Future<void> asFuture();
}

This API can can be used like so:

Future<void> foo() async {
  final token = CancellationToken.current;  // OK
  longRunning(..., token); 
  void cb() {
    CancellationToken.current.isCancelled; // UnsupportedError
  }
}

void longRunning(CancellationToken token) {
  if (token.isCancelled) {
    // ...
  }
    
  token.asFuture().then((_) => something.cancel());
}

An alternative approach could be to come up with a special syntax which would only make sense inside async functions - but it is unclear if it is worth introducing special syntax for such a fairly specialized use case.

Cancellation Overheads

  • Propagating cancellation through the chain of Future objects requires linking them in both directions: currently if you do f2 = f1.then((v) => ...) f2 does not actually remember that it is waiting for f1 to complete, instead f1 knows that it needs to propagate result to f2. This will introduce additional memory overhead.
  • There will be some additional runtime overhead at await statements - though it is likely going to be negligible.

Concerns

One of the main concerns around Future cancellation in general is its behavior in the presence of multiple Future listeners.

Future<T> f = computation();
var f1 = f.then(handleValueOneWay);
var f2 = f.then(handleValueAnotherWay);
(f1 as CancellableFuture).tryCancel();

The proposed behavior for CancellableFuture would be to cancel the underlying computation by canceling f ignoring the fact that f has other listeners. This would lead both f1 and f2 to complete with CancelException error.

We can choose between

  • Do not cancel f at all if it has multiple listeners.
  • Cancel if all listeners cancel.
  • Cancel if any listener cancels (proposed behavior).
  • Throw an error when trying to cancel a Future with more than one listener.
  • Make each Completer aware about the number of active listeners and let it decide whether underlying computation should be canceled.

It is worth noting that proposed behavior is flexible enough to allows users to build their own BroadcastingFuture which only cancels the computation if all of its listeners canceled:

class BroadcastingFuture<T> implements CancellableFuture {
  final Future<T> inner;
  int activeListeners = 0;
    
  @override
  Future<R> then<R>(FutureOr<R> onValue(T value), {Function? onError}) {
    final result = Completer<R>.cancellable(tryCancel: (reason) {
      if (--activeListeners == 0) {
        inner.tryCancel(reason);    
      }
      return true;
    });
    inner.then((v) {
      if (!v.isCancelled) {
          result.complete(onValue(v));
      }
    });
    activeListeners++;
    return result.future;
  }
}

final f = BroadcastingFuture<T>(computation());
var f1 = f.then(handleValueOneWay);
var f2 = f.then(handleValueAnotherWay);
f1.tryCancel();

Portability

  • Backends (e.g. JS or Wasm) are free to choose to not support this feature at all and simply produce Future objects which don't implement the marker interface.

Open Questions

None at the moment.

Changes

  • Fixed description of CancellableOperation (it does not use Zones), added link to CancellableZone POC from @lrhn.
  • Added Concerns section and documented common concern around canceling Future with multiple listeners.
  • Added section introducing a way to access cancellation state of a running async function.
  • Documented that we also want to integrate existing code with cancellation:
    • Future.timeout will cancel underlying computation when timeout is reached.
    • Isolate.run should propagate cancellation request from calling isolate to the isolate which is running the computation.
  • API changes
    • CancelledOperation is not CancelException (which implements Exception).
    • tryCancel takes reason and propagates it to Completer onCancel callback.
    • Completer.cancellable: onCancel callback (previously tryCancel) now gets Completer object itself and the cancellation reason object (an instance of CancelException by default). It can choose to complete normally or throw some error. Default implementation simply completes with the given error, but developers using this API are free to decide if they want some other behavior.
    • Removed Completer.isCancelled - users can simply check Completer.isCompleted.
    • Removed dropValue callback - most likely not worth it. Users should simply take care of checking Completer.isCompleted manually and handling this situation properly if they use Completer.cancellable.
@natebosch
Copy link

Currently the only option is to wrap business logic in a CancellableOperaton which suppresses propagation of the produced values through the graph of Future's by intercepting listener callbacks via a custom Zone.

[nit] There is no Zone involved in the implementation of CancellableOperation.

@natebosch
Copy link

If this CancellableFuture was produced from another CancellableFuture (e.g. through then) then we recursively invoke cancel on those Future's.

Cancel propagation is configurable with CancellableOperation. If we propagate unconditionally I expect users will run into unexpected behavior when a Future that is awaited from multiple places gets canceled for both if only 1 of the places wanted to cancel.

@mraleph
Copy link
Author

mraleph commented Jul 6, 2023

@natebosch Thanks!

[nit] There is no Zone involved in the implementation of CancellableOperation.

I got my wires crossed with CancellableZone. Updated the text.

If we propagate unconditionally I expect users will run into unexpected behavior when a Future that is awaited from multiple places gets canceled for both if only 1 of the places wanted to cancel.

This is a common concern. Update the text to include it.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment