Last active
June 17, 2024 15:47
-
-
Save Blackjacx/82a988a14e6256bb5b1a7db3284bed14 to your computer and use it in GitHub Desktop.
AsyncStream Non-Concurrent Demo
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import UIKit | |
/// This object should contain everything we need to process async, | |
/// non-concurrent stream updates. | |
struct Update { | |
var id: Int | |
var finishStream: () -> Void | |
} | |
final class UpdateMonitor { | |
private var id: Int = 0 | |
private var shouldStop = false | |
var updateHandler: (Update) -> Void | |
var errorHandler: (Error) -> Void | |
init(updateHandler: @escaping (Update) -> Void, errorHandler: @escaping (Error) -> Void) { | |
self.updateHandler = updateHandler | |
self.errorHandler = errorHandler | |
} | |
func start() { | |
Task { [weak self] in | |
while true { | |
guard let self, shouldStop == false else { | |
self?.shouldStop = true | |
break | |
} | |
let update = Update(id: id, finishStream: { [weak self] in | |
guard let self else { return } | |
shouldStop = true | |
}) | |
id += 1 | |
updateHandler(update) | |
} | |
} | |
} | |
func stop() { | |
shouldStop = true | |
} | |
} | |
extension UpdateMonitor { | |
// Demo: Trigger updates | |
static func makeStream() -> AsyncThrowingStream<Update, Error> { | |
AsyncThrowingStream { continuation in | |
let monitor = UpdateMonitor(updateHandler: { update in | |
print("Received update: \(update.id))") | |
continuation.yield(update) | |
}, errorHandler: { error in | |
continuation.finish(throwing: error) | |
}) | |
continuation.onTermination = { @Sendable status in | |
print("Stream terminated with status: \(status)") | |
monitor.stop() | |
} | |
print("Starting stream…") | |
monitor.start() | |
} | |
} | |
} | |
var stream = UpdateMonitor.makeStream() | |
Task { | |
do { | |
for try await update in stream { | |
let seconds = 0.5 | |
let duration = UInt64(seconds * 1000000000) | |
try await Task.sleep(nanoseconds: duration) | |
// Check for if we still receive updates here | |
guard update.id <= 10 else { | |
update.finishStream() | |
// | |
// Cleanup code here | |
// | |
break | |
} | |
print("Processing update with ID: \(update.id)") | |
// | |
// Process the received update here | |
// | |
} | |
print("Stream done.") | |
} catch { | |
print("Received error: \(error))") | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment