Skip to content

Instantly share code, notes, and snippets.

@ole
Last active December 23, 2021 13:27
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save ole/b7872efc1824bfdc6da1e414d957b33f to your computer and use it in GitHub Desktop.
Save ole/b7872efc1824bfdc6da1e414d957b33f to your computer and use it in GitHub Desktop.
Using CheckedContinuation to communicate between two concurrent tasks.
import _Concurrency
actor Channel<Output> {
private var conditionVar: CheckedContinuation<Output, Never>? = nil
deinit {
// TODO: if conditionVar != nil, resume it by throwing `CancellationError()`?
}
/// If there's no receiver, the sent value will be lost.
func send(_ value: Output) {
conditionVar?.resume(returning: value)
conditionVar = nil
}
/// - Precondition: Only one `receive` may be active at a time.
/// Calling `send` will reset the receive state.
func receive() async -> Output {
precondition(conditionVar == nil, "Called receive() a second time while the first is still active")
return await withCheckedContinuation { cont in
conditionVar = cont
}
}
}
actor Buffer<Element> {
let bufferSize: Int
private var elements: [Element]
private let isFullSignal: Channel<Void> = .init()
private let hasElementsSignal: Channel<Void> = .init()
init(size: Int) {
self.bufferSize = size
var elements: [Element] = []
elements.reserveCapacity(size)
self.elements = elements
}
func append(_ value: Element) async {
while elements.count >= bufferSize {
// Wait until there's room in the buffer.
print("producer waiting for buffer to empty")
await isFullSignal.receive()
}
elements.append(value)
await hasElementsSignal.send(())
}
func next() async -> Element? {
while true {
if !elements.isEmpty {
let e = elements.removeFirst()
await isFullSignal.send(())
return e
} else {
// Wait until there's a new element in the buffer.
print("consumer waiting for buffer to fill")
await hasElementsSignal.receive()
}
}
}
}
Task {
let buffer = Buffer<Int>(size: 2)
// Consumer task
Task {
while let next = await buffer.next() {
print(next)
try! await Task.sleep(nanoseconds: 100_000_000)
}
}
// Producer task
Task {
await buffer.append(1)
try! await Task.sleep(nanoseconds: 400_000_000)
await buffer.append(2)
await buffer.append(3)
await buffer.append(4)
await buffer.append(5)
}
}
import PlaygroundSupport
PlaygroundPage.current.needsIndefiniteExecution = true
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment