Skip to content

Instantly share code, notes, and snippets.

@tcldr
Last active April 23, 2023 12:48
Show Gist options
  • Star 3 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save tcldr/afa8eccb5119027c3a5327c84e52d7f3 to your computer and use it in GitHub Desktop.
Save tcldr/afa8eccb5119027c3a5327c84e52d7f3 to your computer and use it in GitHub Desktop.
Swift Concurrency Utilities for managing Transactional Resources
// A simple serial queue. Every operation sent through the queue will be executed
// in first-in first-out order. Every operation will complete its execution fully,
// even if it includes suspension points, prior to the next operation commencing.
//
// TIP: If calling from a global actor like @MainActor annotate your closure to
// get better ergonomics: `queue.send { @MainActor in /* ... */ }`
public final class AsyncSerialQueue: Sendable {
public typealias Operation = @Sendable () async -> Void
public typealias Continuation = AsyncStream<Operation>.Continuation
public typealias YieldResult = Continuation.YieldResult
private let continuation: AsyncStream<Operation>.Continuation
private let task: Task<Void, Error>
public init(bufferingPolicy: Continuation.BufferingPolicy = .unbounded) {
let (operations, continuation) = AsyncStream.withExtractedContinuation(
Operation.self, bufferingPolicy: bufferingPolicy)
self.continuation = continuation
self.task = Task {
for await operation in operations {
try Task.checkCancellation()
await operation()
}
}
}
deinit {
// Cancels the Task prior to commencement of next operation.
task.cancel()
// In the case that the operation queue has no pending operations, checkCancellation
// won't be called. Send through an empty operation to handle this case.
continuation.yield { }
}
@discardableResult
public func send(_ operation: @escaping Operation) -> YieldResult {
continuation.yield(operation)
}
}
// An AsyncReactor. A reactor can be used to protect a transactional resource where
// every operation must be carried out in sequence. The send closure includes a
// parameter for the protected resource through which operations on the protected
// resource can be performed.
//
// TIP: If calling from a global actor like @MainActor annotate your closure to
// get better ergonomics: `reactor.send { @MainActor resource in /* ... */ }`
public final class AsyncReactor<Resource: Sendable>: Sendable {
public typealias Operation = @Sendable (Resource) async -> Void
public typealias Continuation = AsyncStream<Operation>.Continuation
public typealias YieldResult = Continuation.YieldResult
private let continuation: AsyncStream<Operation>.Continuation
private let task: Task<Void, Error>
public init(resource: Resource, bufferingPolicy: Continuation.BufferingPolicy = .unbounded) {
let (operations, continuation) = AsyncStream.withExtractedContinuation(
Operation.self, bufferingPolicy: bufferingPolicy)
self.continuation = continuation
self.task = Task {
for await operation in operations {
try Task.checkCancellation()
await operation(resource)
}
}
}
deinit {
// Cancels the Task prior to commencement of next operation.
task.cancel()
// In the case that the operation queue has no pending operations, checkCancellation
// won't be called. Send through an empty operation to handle this case.
continuation.yield { _ in }
}
@discardableResult
public func send(_ operation: @escaping Operation) -> YieldResult {
continuation.yield(operation)
}
}
// AsyncStream addition that acts as the basis for the prebious utilities. It breaks
// apart an AsyncStream from its continuation as a way to send items in series via
// a non-async, non-blocking call from outside the initialiser of an AsyncStream.
public extension AsyncStream {
static func withExtractedContinuation(
_ type: Element.Type,
bufferingPolicy: AsyncStream<Element>.Continuation.BufferingPolicy
) -> (AsyncStream<Element>, AsyncStream<Element>.Continuation) {
var extractedContinuation: AsyncStream<Element>.Continuation! = nil
let stream = AsyncStream(Element.self, bufferingPolicy: bufferingPolicy) { continuation in
extractedContinuation = continuation
}
return (stream, extractedContinuation)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment