-
-
Save fatbobman/09954daa67f8f78cb11c0ff9f8bcb318 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public struct CombineAsyncPublsiher<P>: AsyncSequence, AsyncIteratorProtocol where P: Publisher, P.Failure == Never { | |
public typealias Element = P.Output | |
public typealias AsyncIterator = CombineAsyncPublsiher<P> | |
public func makeAsyncIterator() -> Self { | |
return self | |
} | |
private let stream: AsyncStream<P.Output> | |
private var iterator: AsyncStream<P.Output>.Iterator | |
private var cancellable: AnyCancellable? | |
public init(_ upstream: P, bufferingPolicy limit: AsyncStream<Element>.Continuation.BufferingPolicy = .unbounded) { | |
var subscription: AnyCancellable? | |
stream = AsyncStream<P.Output>(P.Output.self, bufferingPolicy: limit) { continuation in | |
subscription = upstream | |
.sink(receiveValue: { value in | |
continuation.yield(value) | |
}) | |
} | |
cancellable = subscription | |
iterator = stream.makeAsyncIterator() | |
} | |
public mutating func next() async -> P.Output? { | |
await iterator.next() | |
} | |
} | |
public extension Publisher where Self.Failure == Never { | |
var sequence: CombineAsyncPublsiher<Self> { | |
CombineAsyncPublsiher(self) | |
} | |
} | |
public struct CombineAsyncThrowingPublsiher<P: Publisher>: AsyncSequence, AsyncIteratorProtocol { | |
public typealias Element = P.Output | |
public typealias AsyncIterator = CombineAsyncThrowingPublsiher<P> | |
public func makeAsyncIterator() -> Self { | |
return self | |
} | |
private let stream: AsyncThrowingStream<P.Output, Error> | |
private var iterator: AsyncThrowingStream<P.Output, Error>.AsyncIterator // = stream.makeAsyncIterator() | |
private var cancellable: AnyCancellable? | |
public init(_ upstream: P, bufferingPolicy limit: AsyncThrowingStream<Element, Error>.Continuation.BufferingPolicy = .unbounded) { | |
var subscription: AnyCancellable? | |
stream = AsyncThrowingStream<P.Output, Error>(P.Output.self, bufferingPolicy: limit) { continuation in | |
subscription = upstream | |
.handleEvents( | |
receiveCancel: { | |
continuation.finish(throwing: nil) | |
} | |
) | |
.sink(receiveCompletion: { completion in | |
switch completion { | |
case .failure(let error): | |
continuation.finish(throwing: error) | |
case .finished: continuation.finish(throwing: nil) | |
} | |
}, receiveValue: { value in | |
continuation.yield(value) | |
}) | |
} | |
cancellable = subscription | |
iterator = stream.makeAsyncIterator() | |
} | |
public mutating func cancel() { | |
cancellable?.cancel() | |
cancellable = nil | |
} | |
public mutating func next() async throws -> P.Output? { | |
try await iterator.next() | |
} | |
} | |
public extension Publisher { | |
var sequence: CombineAsyncThrowingPublsiher<Self> { | |
CombineAsyncThrowingPublsiher(self) | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment