Skip to content

Instantly share code, notes, and snippets.

@AndreyAnt
Created November 5, 2021 02:49
Show Gist options
  • Save AndreyAnt/f6a755edf1a1842512f98c1bd12a3721 to your computer and use it in GitHub Desktop.
Save AndreyAnt/f6a755edf1a1842512f98c1bd12a3721 to your computer and use it in GitHub Desktop.
Marin Todorov AsyncSequence wrapper for Combine.Publisher
class CombineAsyncStream<Upstream: Publisher>: AsyncSequence {
typealias Element = Upstream.Output
typealias AsyncIterator = CombineAsyncStream<Upstream>
func makeAsyncIterator() -> Self {
return self
}
private let stream:
AsyncThrowingStream<Upstream.Output, Error>
private lazy var iterator = stream.makeAsyncIterator()
private var cancellable: AnyCancellable?
public init(_ upstream: Upstream) {
var subscription: AnyCancellable? = nil
stream = AsyncThrowingStream<Upstream.Output, Error>
(Upstream.Output.self) { continuation in
subscription = upstream
.handleEvents(
receiveCancel: {
continuation.finish(throwing: nil)
}
)
.sink(receiveCompletion: { completion in
switch completion {
case .failure(let error):
continuation.finish(throwing: error)
case .finished: continuation.finish(throwing: nil)
}
}, receiveValue: { value in
continuation.yield(value)
})
}
cancellable = subscription
}
func cancel() {
cancellable?.cancel()
cancellable = nil
}
}
extension CombineAsyncStream: AsyncIteratorProtocol {
public func next() async throws -> Upstream.Output? {
return try await iterator.next()
}
}
extension Publisher {
func asyncStream() -> CombineAsyncStream<Self> {
return CombineAsyncStream(self)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment