Using CheckedContinuation to communicate between concurrent tasks.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// This is the code for https://forums.swift.org/t/communicating-between-two-concurrent-tasks/54240 | |
import _Concurrency | |
actor Buffer { | |
var elements: [Int] = [] | |
private var isNotEmpty: CheckedContinuation<Void, Never>? = nil | |
deinit { | |
// TODO: If the continuation is not nil, | |
// resume it by throwing cancellation error? | |
} | |
func enqueue(_ value: Int) { | |
elements.append(value) | |
// Signal dequeue if needed. | |
isNotEmpty?.resume() | |
isNotEmpty = nil | |
} | |
func dequeue() async -> Int { | |
while true { | |
if !elements.isEmpty { | |
return elements.removeFirst() | |
} else { | |
// Suspend until another element is enqueued. | |
print("dequeue waiting") | |
await withCheckedContinuation { cont in | |
self.isNotEmpty = cont | |
} | |
} | |
} | |
} | |
} | |
Task { | |
let buffer = Buffer() | |
// Producer task | |
Task { | |
await buffer.enqueue(1) | |
try! await Task.sleep(nanoseconds: 400_000_000) | |
await buffer.enqueue(2) | |
await buffer.enqueue(3) | |
} | |
// Consumer task | |
Task { | |
while true { | |
let next = await buffer.dequeue() | |
print(next) | |
try! await Task.sleep(nanoseconds: 100_000_000) | |
} | |
} | |
} | |
import PlaygroundSupport | |
PlaygroundPage.current.needsIndefiniteExecution = true |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment