Skip to content

Instantly share code, notes, and snippets.

@Blackjacx
Last active June 17, 2024 15:47
Show Gist options
  • Save Blackjacx/82a988a14e6256bb5b1a7db3284bed14 to your computer and use it in GitHub Desktop.
Save Blackjacx/82a988a14e6256bb5b1a7db3284bed14 to your computer and use it in GitHub Desktop.
AsyncStream Non-Concurrent Demo
import UIKit
/// This object should contain everything we need to process async,
/// non-concurrent stream updates.
struct Update {
var id: Int
var finishStream: () -> Void
}
final class UpdateMonitor {
private var id: Int = 0
private var shouldStop = false
var updateHandler: (Update) -> Void
var errorHandler: (Error) -> Void
init(updateHandler: @escaping (Update) -> Void, errorHandler: @escaping (Error) -> Void) {
self.updateHandler = updateHandler
self.errorHandler = errorHandler
}
func start() {
Task { [weak self] in
while true {
guard let self, shouldStop == false else {
self?.shouldStop = true
break
}
let update = Update(id: id, finishStream: { [weak self] in
guard let self else { return }
shouldStop = true
})
id += 1
updateHandler(update)
}
}
}
func stop() {
shouldStop = true
}
}
extension UpdateMonitor {
// Demo: Trigger updates
static func makeStream() -> AsyncThrowingStream<Update, Error> {
AsyncThrowingStream { continuation in
let monitor = UpdateMonitor(updateHandler: { update in
print("Received update: \(update.id))")
continuation.yield(update)
}, errorHandler: { error in
continuation.finish(throwing: error)
})
continuation.onTermination = { @Sendable status in
print("Stream terminated with status: \(status)")
monitor.stop()
}
print("Starting stream…")
monitor.start()
}
}
}
var stream = UpdateMonitor.makeStream()
Task {
do {
for try await update in stream {
let seconds = 0.5
let duration = UInt64(seconds * 1000000000)
try await Task.sleep(nanoseconds: duration)
// Check for if we still receive updates here
guard update.id <= 10 else {
update.finishStream()
//
// Cleanup code here
//
break
}
print("Processing update with ID: \(update.id)")
//
// Process the received update here
//
}
print("Stream done.")
} catch {
print("Received error: \(error))")
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment