Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save tkersey/b6c83aeb8e20787e9eb8ba4d0d54dba5 to your computer and use it in GitHub Desktop.
Save tkersey/b6c83aeb8e20787e9eb8ba4d0d54dba5 to your computer and use it in GitHub Desktop.
An AsyncSequence that allows to be consumed several times. Returning the current state as specified in a reduce function
struct ReducedReplayAsyncStream<Element> {
typealias Reduce = (_ partialResult: inout Element, _ nextResult: Element) -> Void
private let storage: _Storage
private var originalStream: AsyncStream<Element>
init(
bufferingPolicy limit: AsyncStream<Element>.Continuation.BufferingPolicy = .unbounded,
initialResult: Element,
reduce: @escaping Reduce,
build: (AsyncStream<Element>.Continuation) -> Void
) {
originalStream = AsyncStream(Element.self, bufferingPolicy: limit, build)
storage = _Storage(stored: initialResult, reduce: reduce)
}
private func makeStream() -> AsyncStream<Element> {
AsyncStream<Element> { continuation in
Task {
var isFirst = false
if await !storage.didStart {
await storage.setDidStart(true)
isFirst = true
startConsumingOriginalStream()
}
if !isFirst {
await continuation.yield(storage.stored)
}
await storage.appendContinuation(continuation)
}
}
}
private func startConsumingOriginalStream () {
Task {
for await value in originalStream {
await storage.updateWith(value: value)
}
await storage.continuations.forEach { $0.finish() }
}
}
}
extension ReducedReplayAsyncStream {
private actor _Storage {
private let reduce: ReducedReplayAsyncStream.Reduce
var didStart = false
var stored: Element
var continuations: [AsyncStream<Element>.Continuation] = []
init(stored: Element, reduce: @escaping Reduce) {
self.stored = stored
self.reduce = reduce
}
func updateWith(value: Element) {
reduce(&stored, value)
continuations.forEach { $0.yield(value) }
}
func setDidStart(_ value: Bool) {
didStart = value
}
func appendContinuation(_ continuation: AsyncStream<Element>.Continuation) {
continuations.append(continuation)
}
}
}
extension ReducedReplayAsyncStream: AsyncSequence {
typealias AsyncIterator = AsyncStream<Element>.AsyncIterator
func makeAsyncIterator() -> AsyncIterator {
let stream = makeStream()
return stream.makeAsyncIterator()
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment