Skip to content

Instantly share code, notes, and snippets.

@groue
Created February 14, 2024 13:12
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save groue/5877ab8a0cbc119f45be3db114cf5ed3 to your computer and use it in GitHub Desktop.
Save groue/5877ab8a0cbc119f45be3db114cf5ed3 to your computer and use it in GitHub Desktop.
A Swift concurrency queue that I find useful when dealing with synchronization.
import Semaphore
/// The priority of an operation started by `SynchronizationQueue`.
enum SynchronizationPriority {
/// An operation with `required` priority is not cancelled by
/// subsequent operations.
case required
/// Intended for "pull-to-refresh": an operation with `refresh` priority
/// is cancelled by subsequent operations started with the `refresh` or
/// `required` priorities.
case refresh
/// Intended for faceless synchronizations: an operation with
/// `background` priority is cancelled by all subsequent
/// sync operations.
case background
}
/// An actor that serializes synchronization operations.
///
/// Usage:
///
/// ```swift
/// let queue = SynchronizationQueue()
///
/// // Wait until sync operation has completed.
/// let value = try await queue.perform(priority: .required) {
/// try await someValue()
/// }
///
/// // Sync and forget.
/// await queue.performAndForget(priority: .background) {
/// _ = try await someValue()
/// }
/// ```
///
/// Sync operations are started with a priority, one of:
///
/// - `required`: the operation is not cancelled by subsequent operations.
///
/// - `refresh`: intended for "pull-to-refresh". The operation is cancelled
/// by subsequent operations started with the `refresh` or
/// `required` priorities.
///
/// - `background`: intended for faceless synchronizations. The operation
/// is cancelled by all subsequent operations.
actor SynchronizationQueue {
typealias OperationTask = Task<Void, Never>
private let semaphore = AsyncSemaphore(value: 1)
private var registeredTasks: [SynchronizationPriority: OperationTask] = [:]
init() { }
/// Returns the result of the given operation.
///
/// Previous operations are eventually cancelled, according to `priority`.
///
/// - Parameters:
/// - priority: The priority of the operation.
/// - didStartTask: (Intended for testing): a closure that is called
/// after previous operations were eventually cancelled, and the
/// task for the new operation is started, ready for
/// eventual cancellation.
/// - operation: An async closure that performs the synchronization.
/// - Returns: The result of `operation`
/// - Throws: The error of `operation`, or `CancellationError` if the
/// operation was cancelled by a subsequent operation.
func perform<Success: Sendable>(
priority: SynchronizationPriority,
didStartTask: @Sendable (OperationTask) -> Void = { _ in },
operation: @escaping @Sendable () async throws -> Success
) async throws -> Success {
try await withCheckedThrowingContinuation { continuation in
let task = synchronize(
priority: priority,
operation: operation,
completion: { result in
continuation.resume(with: result)
})
didStartTask(task)
}
}
/// Schedules the given operation for asynchronous execution, and
/// returns immediately.
///
/// Previous operations are eventually cancelled, according to `priority`.
///
/// - Parameters:
/// - priority: The priority of the operation.
/// - operation: An async closure that performs the synchronization.
/// - Returns: A top-level task that runs the operation.
@discardableResult
func performAndForget(
priority: SynchronizationPriority,
_ operation: @escaping @Sendable () async throws -> Void
) -> OperationTask {
synchronize(
priority: priority,
operation: operation,
completion: { _ in })
}
/// - Parameters:
/// - priority: The priority of the operation.
/// - didStartTask: (Intended for testing): a closure that is called
/// after previous operations were eventually cancelled, and the
/// task for the new operation is started, ready for
/// eventual cancellation.
/// - completion: A closure called with the result of `operation`.
/// - operation: An async closure that performs the synchronization.
private func synchronize<Success: Sendable>(
priority: SynchronizationPriority,
operation: @escaping @Sendable () async throws -> Success,
completion: @escaping @Sendable (Result<Success, any Error>) -> Void
) -> OperationTask {
// This method is synchronous. This is how we are sure that all
// started tasks are registered and can be cancelled by subsequent
// calls to this method.
// Cancel previous tasks if needed.
cancelTasks(cancelledBy: priority)
// Start a task
let task = Task {
defer {
// Task has completed: undo registerTask
unregisterCurrentTask(for: priority)
}
do {
// Serialization of sync operations is guaranteed by the
// semaphore, which also catches early cancellations
// performed after the actor method returns but before the
// semaphore has started waiting.
try await semaphore.waitUnlessCancelled()
defer { semaphore.signal() }
let value = try await operation()
// Operation has completed successfully. Just in case it
// would have caught `CancellationError`, let's check for
// cancellation again. This guarantees that the caller
// always sees a `CancellationError`, whenever cancellation
// happens before, during, or after `operation` execution.
try Task.checkCancellation()
completion(.success(value))
} catch {
completion(.failure(error))
}
}
// Register the task for eventual cancellation
registerTask(task, for: priority)
return task
}
private func cancelTasks(cancelledBy priority: SynchronizationPriority) {
switch priority {
case .required, .refresh:
cancelTask(for: .refresh)
cancelTask(for: .background)
case .background:
cancelTask(for: .background)
}
}
private func registerTask(_ task: OperationTask, for priority: SynchronizationPriority) {
registeredTasks[priority] = task
}
private func cancelTask(for priority: SynchronizationPriority) {
if let task = registeredTasks.removeValue(forKey: priority) {
task.cancel()
}
}
private func unregisterCurrentTask(for priority: SynchronizationPriority) {
if Task.isCancelled {
// Two possibilities: another operation has cancelled the
// current task, or the user has independently cancelled the
// current task.
//
// In the first case, `registeredTasks` contains another task,
// that we must leave alone so that it can be cancelled on
// its turn, and that's why we return.
//
// In the second case, `registeredTasks` still contains the
// current task. We'll leak a little memory until a new
// operation is registered.
return
}
// Release memory
registeredTasks.removeValue(forKey: priority)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment