Skip to content

Instantly share code, notes, and snippets.

@fatbobman
Created December 5, 2021 01:16
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save fatbobman/09954daa67f8f78cb11c0ff9f8bcb318 to your computer and use it in GitHub Desktop.
Save fatbobman/09954daa67f8f78cb11c0ff9f8bcb318 to your computer and use it in GitHub Desktop.
public struct CombineAsyncPublsiher<P>: AsyncSequence, AsyncIteratorProtocol where P: Publisher, P.Failure == Never {
public typealias Element = P.Output
public typealias AsyncIterator = CombineAsyncPublsiher<P>
public func makeAsyncIterator() -> Self {
return self
}
private let stream: AsyncStream<P.Output>
private var iterator: AsyncStream<P.Output>.Iterator
private var cancellable: AnyCancellable?
public init(_ upstream: P, bufferingPolicy limit: AsyncStream<Element>.Continuation.BufferingPolicy = .unbounded) {
var subscription: AnyCancellable?
stream = AsyncStream<P.Output>(P.Output.self, bufferingPolicy: limit) { continuation in
subscription = upstream
.sink(receiveValue: { value in
continuation.yield(value)
})
}
cancellable = subscription
iterator = stream.makeAsyncIterator()
}
public mutating func next() async -> P.Output? {
await iterator.next()
}
}
public extension Publisher where Self.Failure == Never {
var sequence: CombineAsyncPublsiher<Self> {
CombineAsyncPublsiher(self)
}
}
public struct CombineAsyncThrowingPublsiher<P: Publisher>: AsyncSequence, AsyncIteratorProtocol {
public typealias Element = P.Output
public typealias AsyncIterator = CombineAsyncThrowingPublsiher<P>
public func makeAsyncIterator() -> Self {
return self
}
private let stream: AsyncThrowingStream<P.Output, Error>
private var iterator: AsyncThrowingStream<P.Output, Error>.AsyncIterator // = stream.makeAsyncIterator()
private var cancellable: AnyCancellable?
public init(_ upstream: P, bufferingPolicy limit: AsyncThrowingStream<Element, Error>.Continuation.BufferingPolicy = .unbounded) {
var subscription: AnyCancellable?
stream = AsyncThrowingStream<P.Output, Error>(P.Output.self, bufferingPolicy: limit) { continuation in
subscription = upstream
.handleEvents(
receiveCancel: {
continuation.finish(throwing: nil)
}
)
.sink(receiveCompletion: { completion in
switch completion {
case .failure(let error):
continuation.finish(throwing: error)
case .finished: continuation.finish(throwing: nil)
}
}, receiveValue: { value in
continuation.yield(value)
})
}
cancellable = subscription
iterator = stream.makeAsyncIterator()
}
public mutating func cancel() {
cancellable?.cancel()
cancellable = nil
}
public mutating func next() async throws -> P.Output? {
try await iterator.next()
}
}
public extension Publisher {
var sequence: CombineAsyncThrowingPublsiher<Self> {
CombineAsyncThrowingPublsiher(self)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment