Skip to content

Instantly share code, notes, and snippets.

@serhiybutz
Created July 21, 2020 17:14
Show Gist options
  • Save serhiybutz/2a4297692c482567622973ffbf2aa0f3 to your computer and use it in GitHub Desktop.
Save serhiybutz/2a4297692c482567622973ffbf2aa0f3 to your computer and use it in GitHub Desktop.
Combine: withLatestFrom, 03
import Combine
import Foundation
extension Publishers {
public struct WithLatestFrom<Upstream: Publisher, Other: Publisher>: Publisher where Upstream.Failure == Other.Failure {
// MARK: - Types
public typealias Output = (Upstream.Output, Other.Output)
public typealias Failure = Upstream.Failure
private enum MergedElement {
case upstream1(Upstream.Output)
case upstream2(Other.Output)
}
private typealias ScanResult = (output1: Upstream.Output?, output2: Other.Output?, shouldEmit: Bool)
// MARK: - Properties
private let upstream: Upstream
private let other: Other
// MARK: - Initialization
init(upstream: Upstream, other: Other) {
self.upstream = upstream
self.other = other
}
// MARK: - Publisher
public func receive<S: Subscriber>(subscriber: S) where S.Failure == Failure, S.Input == Output {
let merged = mergedStream(upstream, other)
let result = resultStream(from: merged)
result.subscribe(subscriber)
}
}
}
// MARK: - Helpers
extension Publishers.WithLatestFrom {
private func mergedStream(_ upstream1: Upstream, _ upstream2: Other) -> AnyPublisher<MergedElement, Failure> {
let mergedElementUpstream1 = upstream1.map { MergedElement.upstream1($0) }
let mergedElementUpstream2 = upstream2.map { MergedElement.upstream2($0) }
return mergedElementUpstream1
.merge(with: mergedElementUpstream2)
.eraseToAnyPublisher()
}
private func resultStream(from mergedStream: AnyPublisher<MergedElement, Failure>) -> AnyPublisher<Output, Failure> {
mergedStream
.scan(nil) { (prevResult: ScanResult?, mergedElement: MergedElement) -> ScanResult? in
var newOutput1: Upstream.Output?
var newOutput2: Other.Output?
let shouldEmit: Bool
switch mergedElement {
case .upstream1(let v):
newOutput1 = v
shouldEmit = prevResult?.output2 != nil
case .upstream2(let v):
newOutput2 = v
shouldEmit = false
}
return ScanResult(output1: newOutput1 ?? prevResult?.output1,
output2: newOutput2 ?? prevResult?.output2,
shouldEmit: shouldEmit)
}
.compactMap { $0 }
.filter { $0.shouldEmit }
.map { Output($0.output1!, $0.output2!) }
.eraseToAnyPublisher()
}
}
extension Publisher {
func withLatestFrom<Other: Publisher>(_ other: Other)
-> Publishers.WithLatestFrom<Self, Other>
{
return .init(upstream: self, other: other)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment