Skip to content

Instantly share code, notes, and snippets.

@mattmook
Created September 8, 2021 06:04
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save mattmook/f886d12c5f03424b6ba2eee8c1141afb to your computer and use it in GitHub Desktop.
Save mattmook/f886d12c5f03424b6ba2eee8c1141afb to your computer and use it in GitHub Desktop.
OrbitMultiplatform-Publisher.swift
import Foundation
import Combine
import shared
public extension Kotlinx_coroutines_coreFlow {
func asPublisher<T: AnyObject>() -> AnyPublisher<T, Never> {
(FlowPublisher(flow: self) as FlowPublisher<T>).eraseToAnyPublisher()
}
}
private struct FlowPublisher<T: Any>: Publisher {
public typealias Output = T
public typealias Failure = Never
private let flow: Kotlinx_coroutines_coreFlow
public init(flow: Kotlinx_coroutines_coreFlow) {
self.flow = flow
}
public func receive<S: Subscriber>(subscriber: S) where S.Input == T, S.Failure == Failure {
let subscription = FlowSubscription(flow: flow, subscriber: subscriber)
subscriber.receive(subscription: subscription)
}
final class FlowSubscription<S: Subscriber>: Subscription where S.Input == T, S.Failure == Failure {
private var subscriber: S?
private var job: Kotlinx_coroutines_coreJob?
private let flow: Kotlinx_coroutines_coreFlow
init(flow: Kotlinx_coroutines_coreFlow, subscriber: S) {
self.flow = flow
self.subscriber = subscriber
job = SubscribeKt.subscribe(
flow,
onEach: { position in if let position = position as? T { _ = subscriber.receive(position) }},
onComplete: { subscriber.receive(completion: .finished) },
onThrow: { error in debugPrint(error) }
)
}
func cancel() {
subscriber = nil
job?.cancel(cause: nil)
}
func request(_ demand: Subscribers.Demand) {
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment