Skip to content

Instantly share code, notes, and snippets.

@filip-sakel
Last active December 6, 2021 07:55
Show Gist options
  • Save filip-sakel/605e3b4b10025cdae3fffd114ee56c7d to your computer and use it in GitHub Desktop.
Save filip-sakel/605e3b4b10025cdae3fffd114ee56c7d to your computer and use it in GitHub Desktop.
A Swift implementation concurrency structure safely racing two tasks against each other. This structure is useful for implementing operators like `debounce` for `AsyncSequence`.
//
// TaskRacing.swift
//
// Created by Filip Sakel on 21/6/21.
//
import Dispatch
// MARK: - Utilities
struct Time {
let seconds: Double
}
extension Task {
// Sleep doesn't work properly on the Xcode 13 beta 1 toolchain.
static func sleep(for time: Time) async {
typealias Continuation = CheckedContinuation<Void, Never>
let nanoseconds = Int(time.seconds * 1e9)
let time = DispatchTime.now().advanced(by: .nanoseconds(nanoseconds))
await withCheckedContinuation { (continuation: Continuation) in
DispatchQueue.main.asyncAfter(deadline: time) {
continuation.resume()
}
}
}
}
enum Either<First, Second> {
case first(First), second(Second)
}
// MARK: - API
/// Runs the given tasks concurrently, returning once a task wins. When a task wins,
/// this function will return its result value and the handle to the other, uncompleted task.
///
/// Cancellation is not signaled to the uncompleted task, but it can be through the
/// returned handle.
func raceTasks<First, Second>(
_ first: @Sendable @escaping () async -> First,
and second: @Sendable @escaping () async -> Second
) async -> Either<
(First, Task.Handle<Second, Never>),
(Second, Task.Handle<First, Never>)
> {
let racer = RacerActor<First, Second>()
return await racer.race(first: first, second: second)
}
// MARK: - Implementation
fileprivate actor RacerActor<First, Second> {
typealias PackagedResult = Either<
(First, Task.Handle<Second, Never>),
(Second, Task.Handle<First, Never>)
>
private enum State {
case running(Task.Handle<First, Never>, Task.Handle<Second, Never>)
case firstWon(First, Task.Handle<Second, Never>)
case secondWon(Second, Task.Handle<First, Never>)
/// Tries to claim a win for the first task. It returns `true` if it won and `false` if it didn't.
mutating func claimWin(_ value: First) -> Bool {
guard case let .running(_, secondHandle) = self else {
return false
}
self = .firstWon(value, secondHandle)
return true
}
/// Tries to claim a win for the second task. It returns `true` if it won and `false` if it didn't.
mutating func claimWin(_ value: Second) -> Bool {
guard case let .running(firstHandle, _) = self else {
return false
}
self = .secondWon(value, firstHandle)
return true
}
/// Packages the result after a task has won.
func packageResult() -> PackagedResult {
switch self {
case .firstWon(let first, let secondHandle):
return .first((first, secondHandle))
case .secondWon(let second, let firstHandle):
return .second((second, firstHandle))
case .running:
fatalError("No results are available to package until a task wins.")
}
}
}
// We need a way to signal completion from a detached task. The reason
// we need a detached task, in the first place, is that a racer returns only when
// *one* task is finished, whereas a task group requires *all* tasks to finish.
private let observer = YieldingContinuation<PackagedResult, Never>()
// State is force unwrapped because we need a reference to `self`
// when creating the tasks, which are needed to properly initialize state.
private var state: State! = nil {
willSet {
if case .running = newValue { return }
let packagedResult = newValue.packageResult()
_ = observer.yield(packagedResult)
}
}
private func claimWin(_ value: First) -> Bool {
state.claimWin(value)
}
private func claimWin(_ value: Second) -> Bool {
state.claimWin(value)
}
init() {}
func race(
first: @Sendable @escaping () async -> First,
second: @Sendable @escaping () async -> Second
) async -> PackagedResult {
// Declare the handlers.
let firstHandle = detach { [weak self] () -> First in
let result = await first()
_ = await self?.claimWin(result)
return result
}
let secondHandle = detach { [weak self] () -> Second in
let result = await second()
_ = await self?.claimWin(result)
return result
}
// Set the state before we await for the state-dependent tasks.
state = .running(firstHandle, secondHandle)
async {
// We use `async let` so that are tasks run concurrently and not sequentially.
async let firstResult = firstHandle.get()
async let secondResult = secondHandle.get()
_ = await (firstResult, secondResult)
}
// Wait for the observer's message.
return await observer.next()
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment