Skip to content

Instantly share code, notes, and snippets.

@ABridoux
Created January 29, 2022 11:12
Show Gist options
  • Star 2 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save ABridoux/913dfb95fabc22048b0a1c566059105a to your computer and use it in GitHub Desktop.
Save ABridoux/913dfb95fabc22048b0a1c566059105a to your computer and use it in GitHub Desktop.
An AsyncSequence that allows to be consumed several times. Returning the current state as specified in a reduce function
struct ReducedReplayAsyncStream<Element> {
typealias Reduce = (_ partialResult: inout Element, _ nextResult: Element) -> Void
private let storage: _Storage
private var originalStream: AsyncStream<Element>
init(
bufferingPolicy limit: AsyncStream<Element>.Continuation.BufferingPolicy = .unbounded,
initialResult: Element,
reduce: @escaping Reduce,
build: (AsyncStream<Element>.Continuation) -> Void
) {
originalStream = AsyncStream(Element.self, bufferingPolicy: limit, build)
storage = _Storage(stored: initialResult, reduce: reduce)
}
private func makeStream() -> AsyncStream<Element> {
AsyncStream<Element> { continuation in
Task {
var isFirst = false
if await !storage.didStart {
await storage.setDidStart(true)
isFirst = true
startConsumingOriginalStream()
}
if !isFirst {
await continuation.yield(storage.stored)
}
await storage.appendContinuation(continuation)
}
}
}
private func startConsumingOriginalStream () {
Task {
for await value in originalStream {
await storage.updateWith(value: value)
}
await storage.continuations.forEach { $0.finish() }
}
}
}
extension ReducedReplayAsyncStream {
private actor _Storage {
private let reduce: ReducedReplayAsyncStream.Reduce
var didStart = false
var stored: Element
var continuations: [AsyncStream<Element>.Continuation] = []
init(stored: Element, reduce: @escaping Reduce) {
self.stored = stored
self.reduce = reduce
}
func updateWith(value: Element) {
reduce(&stored, value)
continuations.forEach { $0.yield(value) }
}
func setDidStart(_ value: Bool) {
didStart = value
}
func appendContinuation(_ continuation: AsyncStream<Element>.Continuation) {
continuations.append(continuation)
}
}
}
extension ReducedReplayAsyncStream: AsyncSequence {
typealias AsyncIterator = AsyncStream<Element>.AsyncIterator
func makeAsyncIterator() -> AsyncIterator {
let stream = makeStream()
return stream.makeAsyncIterator()
}
}
@ABridoux
Copy link
Author

ABridoux commented Jan 29, 2022

An example

var subscriptions: Set<AnyCancellable> = []
var continuation: Stream.Continuation!

let replayStream = ReducedReplayAsyncStream<Int>(
    initialResult: 0,
    reduce: { partialResult, nextResult in partialResult = partialResult + nextResult },
    build: { continuation = $0 }
)

var counter = 0
Timer.publish(every: 0.4, on: .main, in: .default)
    .autoconnect()
    .sink { _ in
        if counter == 10 {
            continuation.finish()
        }
        continuation.yield(counter)
        counter += 1
    }
    .store(in: &subscriptions)

Task {
    for await value in replayStream {
        print("[A]", value)
    }
}

Task {
    try await Task.sleep(nanoseconds: 3_000_000_000)
    for await value in replayStream {
        print("[B]", value)
    }
}

Considerations

Efficiency

Using an actor as the private storage might not be as efficient as a dispatch queue or a thread lock but I think it feels more natural with the structured concurrency. This could be improved if high performance is needed.

Copy on write

The structure does not implement copy on write. I wonder if it would make sense with an AsyncSequence since some sequence I used do not seem to implement it. And for instance an AsyncStream will assert against two tasks awaiting its next element.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment