Skip to content

Instantly share code, notes, and snippets.

@markmals
Last active June 24, 2021 09:05
Show Gist options
  • Star 3 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save markmals/f36a15d81d6f439a186a2ddbfc992867 to your computer and use it in GitHub Desktop.
Save markmals/f36a15d81d6f439a186a2ddbfc992867 to your computer and use it in GitHub Desktop.
Extensions to Apple's Combine framework to interoperate with Swift 5.5's new concurrency features
import Combine
extension Publisher {
var stream: AsyncThrowingStream<Output> {
AsyncThrowingStream(Output.self) { continuation in
let cancellable = sink { completion in
switch completion {
case .finished: continuation.finish()
case .failure(let error): continuation.finish(throwing: error)
}
} receiveValue: { output in
continuation.yield(output)
}
continuation.onTermination = { _ in cancellable.cancel() }
}
}
}
extension Publisher where Failure == Never {
var stream: AsyncStream<Output> {
AsyncStream(Output.self) { continuation in
let cancellable = sink { _ in
continuation.finish()
} receiveValue: { output in
continuation.yield(output)
}
continuation.onTermination = { _ in cancellable.cancel() }
}
}
}
extension AsyncSequence {
var first: Element? {
get async throws {
try await self.first { _ in true }
}
}
}
protocol AsyncPublisherSequence: Publisher, AsyncSequence {}
extension AsyncPublisherSequence where Element == Output {
func makeAsyncIterator() -> AsyncThrowingStream<Output>.Iterator {
stream.makeAsyncIterator()
}
}
extension AsyncPublisherSequence where Element == Output, Failure == Never {
func makeAsyncIterator() -> AsyncStream<Output>.Iterator {
stream.makeAsyncIterator()
}
}
extension Published.Publisher: AsyncPublisherSequence {}
extension AnyPublisher: AsyncPublisherSequence {}
extension Publishers.Sequence: AsyncPublisherSequence {}
extension Publishers.CombineLatest: AsyncPublisherSequence {}
extension Publishers.CombineLatest3: AsyncPublisherSequence {}
extension Publishers.CombineLatest4: AsyncPublisherSequence {}
extension Publishers.Merge: AsyncPublisherSequence {}
extension Publishers.Merge3: AsyncPublisherSequence {}
extension Publishers.Merge4: AsyncPublisherSequence {}
extension Publishers.Merge5: AsyncPublisherSequence {}
extension Publishers.Merge6: AsyncPublisherSequence {}
extension Publishers.Merge7: AsyncPublisherSequence {}
extension Publishers.Merge8: AsyncPublisherSequence {}
extension Publishers.MergeMany: AsyncPublisherSequence {}
extension Publishers.Zip: AsyncPublisherSequence {}
extension Publishers.Zip3: AsyncPublisherSequence {}
extension Publishers.Zip4: AsyncPublisherSequence {}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment