Last active
December 6, 2021 07:55
-
-
Save filip-sakel/605e3b4b10025cdae3fffd114ee56c7d to your computer and use it in GitHub Desktop.
A Swift implementation concurrency structure safely racing two tasks against each other. This structure is useful for implementing operators like `debounce` for `AsyncSequence`.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// | |
// TaskRacing.swift | |
// | |
// Created by Filip Sakel on 21/6/21. | |
// | |
import Dispatch | |
// MARK: - Utilities | |
struct Time { | |
let seconds: Double | |
} | |
extension Task { | |
// Sleep doesn't work properly on the Xcode 13 beta 1 toolchain. | |
static func sleep(for time: Time) async { | |
typealias Continuation = CheckedContinuation<Void, Never> | |
let nanoseconds = Int(time.seconds * 1e9) | |
let time = DispatchTime.now().advanced(by: .nanoseconds(nanoseconds)) | |
await withCheckedContinuation { (continuation: Continuation) in | |
DispatchQueue.main.asyncAfter(deadline: time) { | |
continuation.resume() | |
} | |
} | |
} | |
} | |
enum Either<First, Second> { | |
case first(First), second(Second) | |
} | |
// MARK: - API | |
/// Runs the given tasks concurrently, returning once a task wins. When a task wins, | |
/// this function will return its result value and the handle to the other, uncompleted task. | |
/// | |
/// Cancellation is not signaled to the uncompleted task, but it can be through the | |
/// returned handle. | |
func raceTasks<First, Second>( | |
_ first: @Sendable @escaping () async -> First, | |
and second: @Sendable @escaping () async -> Second | |
) async -> Either< | |
(First, Task.Handle<Second, Never>), | |
(Second, Task.Handle<First, Never>) | |
> { | |
let racer = RacerActor<First, Second>() | |
return await racer.race(first: first, second: second) | |
} | |
// MARK: - Implementation | |
fileprivate actor RacerActor<First, Second> { | |
typealias PackagedResult = Either< | |
(First, Task.Handle<Second, Never>), | |
(Second, Task.Handle<First, Never>) | |
> | |
private enum State { | |
case running(Task.Handle<First, Never>, Task.Handle<Second, Never>) | |
case firstWon(First, Task.Handle<Second, Never>) | |
case secondWon(Second, Task.Handle<First, Never>) | |
/// Tries to claim a win for the first task. It returns `true` if it won and `false` if it didn't. | |
mutating func claimWin(_ value: First) -> Bool { | |
guard case let .running(_, secondHandle) = self else { | |
return false | |
} | |
self = .firstWon(value, secondHandle) | |
return true | |
} | |
/// Tries to claim a win for the second task. It returns `true` if it won and `false` if it didn't. | |
mutating func claimWin(_ value: Second) -> Bool { | |
guard case let .running(firstHandle, _) = self else { | |
return false | |
} | |
self = .secondWon(value, firstHandle) | |
return true | |
} | |
/// Packages the result after a task has won. | |
func packageResult() -> PackagedResult { | |
switch self { | |
case .firstWon(let first, let secondHandle): | |
return .first((first, secondHandle)) | |
case .secondWon(let second, let firstHandle): | |
return .second((second, firstHandle)) | |
case .running: | |
fatalError("No results are available to package until a task wins.") | |
} | |
} | |
} | |
// We need a way to signal completion from a detached task. The reason | |
// we need a detached task, in the first place, is that a racer returns only when | |
// *one* task is finished, whereas a task group requires *all* tasks to finish. | |
private let observer = YieldingContinuation<PackagedResult, Never>() | |
// State is force unwrapped because we need a reference to `self` | |
// when creating the tasks, which are needed to properly initialize state. | |
private var state: State! = nil { | |
willSet { | |
if case .running = newValue { return } | |
let packagedResult = newValue.packageResult() | |
_ = observer.yield(packagedResult) | |
} | |
} | |
private func claimWin(_ value: First) -> Bool { | |
state.claimWin(value) | |
} | |
private func claimWin(_ value: Second) -> Bool { | |
state.claimWin(value) | |
} | |
init() {} | |
func race( | |
first: @Sendable @escaping () async -> First, | |
second: @Sendable @escaping () async -> Second | |
) async -> PackagedResult { | |
// Declare the handlers. | |
let firstHandle = detach { [weak self] () -> First in | |
let result = await first() | |
_ = await self?.claimWin(result) | |
return result | |
} | |
let secondHandle = detach { [weak self] () -> Second in | |
let result = await second() | |
_ = await self?.claimWin(result) | |
return result | |
} | |
// Set the state before we await for the state-dependent tasks. | |
state = .running(firstHandle, secondHandle) | |
async { | |
// We use `async let` so that are tasks run concurrently and not sequentially. | |
async let firstResult = firstHandle.get() | |
async let secondResult = secondHandle.get() | |
_ = await (firstResult, secondResult) | |
} | |
// Wait for the observer's message. | |
return await observer.next() | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment