Skip to content

Instantly share code, notes, and snippets.

@ABridoux
Last active October 13, 2023 09:42
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save ABridoux/43a4c1dab95f80cd1cdbb0905d628de3 to your computer and use it in GitHub Desktop.
Save ABridoux/43a4c1dab95f80cd1cdbb0905d628de3 to your computer and use it in GitHub Desktop.
Async queue to enqueue elements and consume them in an `AsyncSequence`.
// MARK: - Queue
struct Queue<Element> {
private var enqueueArray: [Element] = []
private var dequeueArray: [Element] = []
var isEmpty: Bool {
enqueueArray.isEmpty && dequeueArray.isEmpty
}
}
extension Queue {
mutating func enqueue(_ element: Element) {
enqueueArray.append(element)
}
mutating func dequeue() -> Element? {
if dequeueArray.isEmpty {
dequeueArray = enqueueArray.reversed()
enqueueArray.removeAll()
}
return dequeueArray.popLast()
}
}
extension Queue {
mutating func removeAll() {
enqueueArray.removeAll()
dequeueArray.removeAll()
}
}
// MARK: - AsyncQueue
actor AsyncQueue<Element> {
private var queue = Queue<Element>()
private var nextContinuation: CheckedContinuation<Element, Never>?
nonisolated var elements: Elements { Elements(queue: self) }
}
extension AsyncQueue {
func enqueue(_ element: Element) {
if let nextContinuation {
nextContinuation.resume(returning: element)
self.nextContinuation = nil
} else {
queue.enqueue(element)
}
}
}
// MARK: - AsyncIteratorProtocol
extension AsyncQueue: AsyncIteratorProtocol {
func next() async -> Element? {
guard nextContinuation == nil else { return nil }
if let next = queue.dequeue() {
return next
} else {
return await withCheckedContinuation { nextContinuation = $0 }
}
}
}
// MARK: - Elements
extension AsyncQueue {
struct Elements: AsyncSequence {
typealias AsyncIterator = AsyncQueue
let queue: AsyncQueue
func makeAsyncIterator() -> AsyncQueue {
queue
}
}
}
@ABridoux
Copy link
Author

ABridoux commented Oct 11, 2023

Article

Related article on Woody's Findings.

Usage example

let queue = AsyncQueue<String>()

Task {
    for await element in queue.elements {
        print(element)
        try await Task.sleep(for: .seconds(1))
    }
}

Task {
    await queue.enqueue("A")
    await queue.enqueue("B")
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment