Skip to content

Instantly share code, notes, and snippets.

@drewmccormack
Last active July 24, 2024 17:17
Show Gist options
  • Save drewmccormack/fc3c26f0e3fe630c7b712b90ab8089fa to your computer and use it in GitHub Desktop.
Save drewmccormack/fc3c26f0e3fe630c7b712b90ab8089fa to your computer and use it in GitHub Desktop.
Proof of concept for a serializing queue for async funcs in Swift. It orders the calls, but also ensures each call is executed to completion before the next starts, thus avoiding interleaving races which can arise in multiple calls to an async func.
/// A queue that executes async functions in order, and atomically.
/// That is, each enqueued func executes fully before the next one is started.
struct AsyncSerialQueue {
typealias Block = () async throws -> [Int]
private struct Item {
let block: Block
let continuation: CheckedContinuation<[Int], Error>
}
private var stream: AsyncStream<Item>
private var streamContinuation: AsyncStream<Item>.Continuation
init() {
var cont: AsyncStream<Item>.Continuation? = nil
let stream: AsyncStream<Item> = AsyncStream { continuation in
cont = continuation
}
self.stream = stream
self.streamContinuation = cont!
Task {
for await item in stream {
do {
let returned = try await item.block()
item.continuation.resume(returning: returned)
} catch {
item.continuation.resume(throwing: error)
}
}
}
}
func enqueue(_ block: @escaping Block) async throws -> [Int] {
try await withCheckedThrowingContinuation { continuation in
let item = Item(block: block, continuation: continuation)
streamContinuation.yield(item)
}
}
static let serializedQueue = AsyncSerialQueue()
/// Passing a function to this will execute it in the queue atomically, and in order.
static func serialized(_ wrapped: @escaping Block) async throws -> [Int] {
try await serializedQueue.enqueue(wrapped)
}
}
// Example: Deliberately introducing shared data to show an interleaving race
// when a queue is not used.
var shared: [Int] = []
/// Shoulld returrn the numbers 1 to 10 if working properly
func numbersToTen() async throws -> [Int] {
shared = []
for i in 1...10 {
shared.append(i)
try? await Task.sleep(for: .seconds(TimeInterval.random(in: 0.01...0.02)))
}
return shared
}
Task {
print("INTERLEAVING")
async let i: [Int] = try await numbersToTen()
async let j: [Int] = try await numbersToTen()
let a = try await [i, j]
print("\(a[0])")
print("\(a[1])")
print()
print("SERIALIZED")
async let k: [Int] = try await AsyncSerialQueue.serialized(numbersToTen)
async let l: [Int] = try await AsyncSerialQueue.serialized(numbersToTen)
let m = try await [k, l]
print("\(m[0])")
print("\(m[1])")
}
// Sample Output
//
// INTERLEAVING
// [1, 1, 2, 2, 3, 3, 4, 4, 5, 5, 6, 6, 7, 8, 7, 9, 8, 10, 9]
// [1, 1, 2, 2, 3, 3, 4, 4, 5, 5, 6, 6, 7, 8, 7, 9, 8, 10, 9, 10]
//
// SERIALIZED
// [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
// [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
//
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment