Created
February 14, 2024 13:12
-
-
Save groue/5877ab8a0cbc119f45be3db114cf5ed3 to your computer and use it in GitHub Desktop.
A Swift concurrency queue that I find useful when dealing with synchronization.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import Semaphore | |
/// The priority of an operation started by `SynchronizationQueue`. | |
enum SynchronizationPriority { | |
/// An operation with `required` priority is not cancelled by | |
/// subsequent operations. | |
case required | |
/// Intended for "pull-to-refresh": an operation with `refresh` priority | |
/// is cancelled by subsequent operations started with the `refresh` or | |
/// `required` priorities. | |
case refresh | |
/// Intended for faceless synchronizations: an operation with | |
/// `background` priority is cancelled by all subsequent | |
/// sync operations. | |
case background | |
} | |
/// An actor that serializes synchronization operations. | |
/// | |
/// Usage: | |
/// | |
/// ```swift | |
/// let queue = SynchronizationQueue() | |
/// | |
/// // Wait until sync operation has completed. | |
/// let value = try await queue.perform(priority: .required) { | |
/// try await someValue() | |
/// } | |
/// | |
/// // Sync and forget. | |
/// await queue.performAndForget(priority: .background) { | |
/// _ = try await someValue() | |
/// } | |
/// ``` | |
/// | |
/// Sync operations are started with a priority, one of: | |
/// | |
/// - `required`: the operation is not cancelled by subsequent operations. | |
/// | |
/// - `refresh`: intended for "pull-to-refresh". The operation is cancelled | |
/// by subsequent operations started with the `refresh` or | |
/// `required` priorities. | |
/// | |
/// - `background`: intended for faceless synchronizations. The operation | |
/// is cancelled by all subsequent operations. | |
actor SynchronizationQueue { | |
typealias OperationTask = Task<Void, Never> | |
private let semaphore = AsyncSemaphore(value: 1) | |
private var registeredTasks: [SynchronizationPriority: OperationTask] = [:] | |
init() { } | |
/// Returns the result of the given operation. | |
/// | |
/// Previous operations are eventually cancelled, according to `priority`. | |
/// | |
/// - Parameters: | |
/// - priority: The priority of the operation. | |
/// - didStartTask: (Intended for testing): a closure that is called | |
/// after previous operations were eventually cancelled, and the | |
/// task for the new operation is started, ready for | |
/// eventual cancellation. | |
/// - operation: An async closure that performs the synchronization. | |
/// - Returns: The result of `operation` | |
/// - Throws: The error of `operation`, or `CancellationError` if the | |
/// operation was cancelled by a subsequent operation. | |
func perform<Success: Sendable>( | |
priority: SynchronizationPriority, | |
didStartTask: @Sendable (OperationTask) -> Void = { _ in }, | |
operation: @escaping @Sendable () async throws -> Success | |
) async throws -> Success { | |
try await withCheckedThrowingContinuation { continuation in | |
let task = synchronize( | |
priority: priority, | |
operation: operation, | |
completion: { result in | |
continuation.resume(with: result) | |
}) | |
didStartTask(task) | |
} | |
} | |
/// Schedules the given operation for asynchronous execution, and | |
/// returns immediately. | |
/// | |
/// Previous operations are eventually cancelled, according to `priority`. | |
/// | |
/// - Parameters: | |
/// - priority: The priority of the operation. | |
/// - operation: An async closure that performs the synchronization. | |
/// - Returns: A top-level task that runs the operation. | |
@discardableResult | |
func performAndForget( | |
priority: SynchronizationPriority, | |
_ operation: @escaping @Sendable () async throws -> Void | |
) -> OperationTask { | |
synchronize( | |
priority: priority, | |
operation: operation, | |
completion: { _ in }) | |
} | |
/// - Parameters: | |
/// - priority: The priority of the operation. | |
/// - didStartTask: (Intended for testing): a closure that is called | |
/// after previous operations were eventually cancelled, and the | |
/// task for the new operation is started, ready for | |
/// eventual cancellation. | |
/// - completion: A closure called with the result of `operation`. | |
/// - operation: An async closure that performs the synchronization. | |
private func synchronize<Success: Sendable>( | |
priority: SynchronizationPriority, | |
operation: @escaping @Sendable () async throws -> Success, | |
completion: @escaping @Sendable (Result<Success, any Error>) -> Void | |
) -> OperationTask { | |
// This method is synchronous. This is how we are sure that all | |
// started tasks are registered and can be cancelled by subsequent | |
// calls to this method. | |
// Cancel previous tasks if needed. | |
cancelTasks(cancelledBy: priority) | |
// Start a task | |
let task = Task { | |
defer { | |
// Task has completed: undo registerTask | |
unregisterCurrentTask(for: priority) | |
} | |
do { | |
// Serialization of sync operations is guaranteed by the | |
// semaphore, which also catches early cancellations | |
// performed after the actor method returns but before the | |
// semaphore has started waiting. | |
try await semaphore.waitUnlessCancelled() | |
defer { semaphore.signal() } | |
let value = try await operation() | |
// Operation has completed successfully. Just in case it | |
// would have caught `CancellationError`, let's check for | |
// cancellation again. This guarantees that the caller | |
// always sees a `CancellationError`, whenever cancellation | |
// happens before, during, or after `operation` execution. | |
try Task.checkCancellation() | |
completion(.success(value)) | |
} catch { | |
completion(.failure(error)) | |
} | |
} | |
// Register the task for eventual cancellation | |
registerTask(task, for: priority) | |
return task | |
} | |
private func cancelTasks(cancelledBy priority: SynchronizationPriority) { | |
switch priority { | |
case .required, .refresh: | |
cancelTask(for: .refresh) | |
cancelTask(for: .background) | |
case .background: | |
cancelTask(for: .background) | |
} | |
} | |
private func registerTask(_ task: OperationTask, for priority: SynchronizationPriority) { | |
registeredTasks[priority] = task | |
} | |
private func cancelTask(for priority: SynchronizationPriority) { | |
if let task = registeredTasks.removeValue(forKey: priority) { | |
task.cancel() | |
} | |
} | |
private func unregisterCurrentTask(for priority: SynchronizationPriority) { | |
if Task.isCancelled { | |
// Two possibilities: another operation has cancelled the | |
// current task, or the user has independently cancelled the | |
// current task. | |
// | |
// In the first case, `registeredTasks` contains another task, | |
// that we must leave alone so that it can be cancelled on | |
// its turn, and that's why we return. | |
// | |
// In the second case, `registeredTasks` still contains the | |
// current task. We'll leak a little memory until a new | |
// operation is registered. | |
return | |
} | |
// Release memory | |
registeredTasks.removeValue(forKey: priority) | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment