Skip to content

Instantly share code, notes, and snippets.

@gbasile
Last active June 21, 2021 18:05
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save gbasile/955b22e7334dc5c5dcbef8ed0e7fccde to your computer and use it in GitHub Desktop.
Save gbasile/955b22e7334dc5c5dcbef8ed0e7fccde to your computer and use it in GitHub Desktop.
import Foundation
import shared
import Combine
/*
TODO:
- Handle backpressure
- Handle cancellation
*/
extension Kotlinx_coroutines_coreStateFlow {
func producer<O, E>() -> AnyPublisher<O, E> {
return KMMPublisher<O, E>(stateFlow: self)
.eraseToAnyPublisher()
}
}
struct KMMPublisher<O, E: Swift.Error>: Publisher {
typealias Failure = E
typealias Output = O
fileprivate var stateFlow: Kotlinx_coroutines_coreStateFlow
func receive<S>(subscriber: S) where S : Subscriber, Self.Failure == S.Failure, Self.Output == S.Input {
let collector = SwiftCollector<O> { value in
_ = subscriber.receive(value) //ignoring backpressure at the moment
}
stateFlow.collect(collector: collector) { (result, error) in
if let error = error as? E {
subscriber.receive(completion: .failure(error))
} else {
subscriber.receive(completion: .finished)
}
}
}
}
class SwiftCollector<T>: Kotlinx_coroutines_coreFlowCollector {
let callback:(T) -> Void
init(callback: @escaping (T) -> Void) {
self.callback = callback
}
func emit(value: Any?, completionHandler: @escaping (KotlinUnit?, Error?) -> Void) {
// do whatever you what with the emitted value
callback(value as! T)
// after you finished your work you need to call completionHandler to
// tell that you consumed the value and the next value can be consumed,
// otherwise you will not receive the next value
//
// i think first parameter can be always nil or KotlinUnit()
// second parameter is for an error which occurred while consuming the value
// passing an error object will throw a NSGenericException in kotlin code, which can be handled or your app will crash
completionHandler(KotlinUnit(), nil)
}
}
class ViewModel: ObservableObject {
@Published var session: Session?
private let spike = SpikeStateFlow()
private var cancellable: AnyCancellable?
init() {
cancellable = spike
.updates
.producer()
.sink(receiveValue: { session in
self.session = session
})
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment