Skip to content

Instantly share code, notes, and snippets.

@serbats
Last active September 13, 2020 20:52
Show Gist options
  • Save serbats/e7effd6c96a56e2377f36618061088ef to your computer and use it in GitHub Desktop.
Save serbats/e7effd6c96a56e2377f36618061088ef to your computer and use it in GitHub Desktop.
Missing Apple Combine Operators: withLatestFrom, materialize, dematerialize
import Foundation
import Combine
// Base Abstraction Classes
@available(OSX 10.15, iOS 13.0, tvOS 13.0, watchOS 6.0, *)
protocol DownStreamHelperProtocol {
/// Helper for handling backpressure
/// DownstreamSubscription class use it as delegate for backpressure handling
associatedtype Input
associatedtype Failure: Error
func request(_ demand: Subscribers.Demand) -> Subscribers.Demand
func downstream(_ value: Input) -> Subscribers.Demand
func downstream(_ completion: Subscribers.Completion<Failure>)
func cancel()
}
@available(OSX 10.15, iOS 13.0, tvOS 13.0, watchOS 6.0, *)
final class DownstreamSubscription<Upstream: Publisher, Helper: DownStreamHelperProtocol>: Subscription, Subscriber where Upstream.Output == Helper.Input, Upstream.Failure == Helper.Failure {
/// Serves as upstream subscriber and keeps it subscription.
/// Also is a subscription for main operator publisher
/// Delegates everything to helper for correct handling
public typealias Failure = Upstream.Failure
public typealias Input = Upstream.Output
private let upstream: Upstream
private let downStream: Helper
private var upstreamSubscription: Subscription?
private var mutex = pthread_mutex_t()
init(upstream: Upstream, downStream: Helper) {
pthread_mutex_init(&mutex, nil)
self.upstream = upstream
self.downStream = downStream
}
func request(_ demand: Subscribers.Demand) {
guard demand > .none else { return }
if let upstreamSubscription = self.upstreamSubscription {
let demandModified = downStream.request(demand)
if demandModified > .none {
upstreamSubscription.request(demandModified)
}
} else {
// https://en.wikipedia.org/wiki/Double-checked_locking
pthread_mutex_lock(&mutex)
if upstreamSubscription == nil {
upstream.subscribe(self) // this should call receive(subscription: Subscription)
//upstreamSubscription should be not nil starting from here
}
pthread_mutex_unlock(&mutex)
let demandModified = downStream.request(demand)
if demandModified > .none {
upstreamSubscription?.request(demandModified)
}
}
}
func cancel() {
downStream.cancel()
upstreamSubscription?.cancel()
upstreamSubscription = nil
}
func receive(_ input: Input) -> Subscribers.Demand {
return self.downStream.downstream(input)
}
func receive(subscription: Subscription) {
self.upstreamSubscription = subscription
}
func receive(completion: Subscribers.Completion<Failure>) {
self.downStream.downstream(completion)
}
}
import Combine
// Mark: - Event enum
public enum Event<Value, Failure: Error> {
case value(Value)
case completion(Subscribers.Completion<Failure>)
func error() -> Failure? {
switch self {
case .value(_):
return nil
case .completion(let compl):
switch compl {
case .finished:
return nil
case .failure(let error):
return error
}
}
}
}
// MARK: - Operator methods
@available(OSX 10.15, iOS 13.0, tvOS 13.0, watchOS 6.0, *)
extension Publisher {
/// Convert any Publisher into an Publisher of its Events.
///
/// - parameter forceRecieveCompletion: If Event.completion should be delivered
/// when subscriber asks for .none elements
/// - returns: A publisher of Events
func materialize(forceRecieveCompletion: Bool = true)
-> Publishers.Materialize<Self> {
return .init(upstream: self, forceRecieveCompletion: forceRecieveCompletion)
}
/// Convert Publisher of Events into Publisher.
///
/// - returns: A publisher from events
func dematerialize<Value, Failure: Error>()
-> Publishers.Dematerialize<Self, Value, Failure> where Self.Output == Event<Value, Failure>, Self.Failure == Never {
return .init(upstream: self)
}
}
// MARK: - Publisher
extension Publishers {
@available(OSX 10.15, iOS 13.0, tvOS 13.0, watchOS 6.0, *)
public struct Materialize<Upstream: Publisher>: Publisher {
public typealias Output = Event<Upstream.Output, Upstream.Failure>
public typealias Failure = Never
private let forceRecieveCompletion: Bool
private let upstream: Upstream
init(upstream: Upstream, forceRecieveCompletion: Bool) {
self.upstream = upstream
self.forceRecieveCompletion = forceRecieveCompletion
}
public func receive<S: Subscriber>(subscriber: S) where Failure == S.Failure, Output == S.Input {
var sub: Subscription!
if forceRecieveCompletion {
let helper = DownStreamHelperForceReceive(subscriber: subscriber)
sub = DownstreamSubscription(upstream: upstream, downStream: helper)
} else {
let helper = DownStreamHelperWaitReceive(subscriber: subscriber)
sub = DownstreamSubscription(upstream: upstream, downStream: helper)
}
subscriber.receive(subscription: sub)
}
}
@available(OSX 10.15, iOS 13.0, tvOS 13.0, watchOS 6.0, *)
public struct Dematerialize<Upstream: Publisher, Value, Failure: Error>: Publisher where Upstream.Output == Event<Value, Failure>, Upstream.Failure == Never {
public typealias Output = Value
private let upstream: Upstream
init(upstream: Upstream) {
self.upstream = upstream
}
public func receive<S: Subscriber>(subscriber: S) where Failure == S.Failure, Output == S.Input {
let helper = DownStreamHelper<S>(subscriber: subscriber)
let sub = DownstreamSubscription(upstream: upstream, downStream: helper)
subscriber.receive(subscription: sub)
}
}
}
// MARK: - Subscription
@available(OSX 10.15, iOS 13.0, tvOS 13.0, watchOS 6.0, *)
extension Publishers.Materialize {
private final class DownStreamHelperForceReceive<S: Subscriber>: DownStreamHelperProtocol where S.Input == Event<Upstream.Output, Upstream.Failure>, S.Failure == Never {
typealias Input = Upstream.Output
typealias Failure = Upstream.Failure
private var subscriber: S?
init(subscriber: S) {
self.subscriber = subscriber
}
func request(_ demand: Subscribers.Demand) -> Subscribers.Demand {
return demand
}
func downstream(_ value: Input) -> Subscribers.Demand {
return self.subscriber?.receive(Event.value(value)) ?? .none
}
func downstream(_ completion: Subscribers.Completion<Failure>) {
_ = self.subscriber?.receive(Event.completion(completion))
self.subscriber?.receive(completion: .finished)
self.subscriber = nil
}
func cancel() {
subscriber = nil
}
}
private final class DownStreamHelperWaitReceive<S: Subscriber>: DownStreamHelperProtocol where S.Input == Event<Upstream.Output, Upstream.Failure>, S.Failure == Never {
typealias Input = Upstream.Output
typealias Failure = Upstream.Failure
private var subscriber: S?
private var demand: Subscribers.Demand = .none
private var completion: Subscribers.Completion<Upstream.Failure>?
private var mutex = pthread_mutex_t()
init(subscriber: S) {
pthread_mutex_init(&mutex, nil)
self.subscriber = subscriber
}
func request(_ demand: Subscribers.Demand) -> Subscribers.Demand {
pthread_mutex_lock(&mutex)
if let completion = self.completion {
_ = self.subscriber?.receive(Event.completion(completion))
self.subscriber?.receive(completion: .finished)
self.subscriber = nil
self.demand = .none
pthread_mutex_unlock(&mutex)
return .none
}
self.demand += demand
pthread_mutex_unlock(&mutex)
return demand
}
func downstream(_ value: Input) -> Subscribers.Demand {
pthread_mutex_lock(&mutex)
guard let subscriber = self.subscriber,
self.demand > .none else {
pthread_mutex_unlock(&mutex)
return .none
}
let adjust = subscriber.receive(Event.value(value))
self.demand = self.demand - 1 + adjust
pthread_mutex_unlock(&mutex)
return adjust
}
func downstream(_ completion: Subscribers.Completion<Failure>) {
pthread_mutex_lock(&mutex)
if self.demand > .none {
_ = self.subscriber?.receive(Event.completion(completion))
self.subscriber?.receive(completion: .finished)
self.subscriber = nil
} else {
self.completion = completion
}
pthread_mutex_unlock(&mutex)
}
func cancel() {
subscriber = nil
}
}
}
@available(OSX 10.15, iOS 13.0, tvOS 13.0, watchOS 6.0, *)
extension Publishers.Dematerialize {
private final class DownStreamHelper<S: Subscriber>: DownStreamHelperProtocol where S.Input == Output, S.Failure == Failure {
typealias Input = Upstream.Output
typealias Failure = Upstream.Failure
private var subscriber: S?
init(subscriber: S) {
self.subscriber = subscriber
}
func request(_ demand: Subscribers.Demand) -> Subscribers.Demand {
return demand
}
func downstream(_ event: Input) -> Subscribers.Demand {
switch event {
case let .value(inputValue):
return self.subscriber?.receive(inputValue) ?? .none
case let .completion(inputCompletetion):
self.subscriber?.receive(completion: inputCompletetion)
self.subscriber = nil
return .none
}
}
func downstream(_ completion: Subscribers.Completion<Failure>) {
self.subscriber?.receive(completion: .finished)
self.subscriber = nil
}
func cancel() {
subscriber = nil
}
}
}
import Combine
// MARK: - Operator methods
@available(OSX 10.15, iOS 13.0, tvOS 13.0, watchOS 6.0, *)
extension Publisher {
/// Merges two publishers into a single publisher by combining each value
/// from self with the latest value from the second publisher, if any.
///
/// - parameter other: Second observable source.
/// - parameter resultSelector: Function to invoke for each value from the self combined
/// with the latest value from the second source, if any.
///
/// - returns: A publisher containing the result of combining each value of the self
/// with the latest value from the second publisher, if any, using the
/// specified result selector function.
func withLatestFrom<Other: Publisher, Result>(_ other: Other,
resultSelector: @escaping (Output, Other.Output) -> Result)
-> Publishers.WithLatestFrom<Self, Other, Result> {
return .init(upstream: self, other: other, resultSelector: resultSelector)
}
/// Upon an emission from self, emit the latest value from the
/// second publisher, if any exists.
///
/// - parameter other: Second observable source.
///
/// - returns: A publisher containing the latest value from the second publisher, if any.
func withLatestFrom<Other: Publisher>(_ other: Other)
-> Publishers.WithLatestFrom<Self, Other, Other.Output> {
return .init(upstream: self, other: other) { $1 }
}
}
// MARK: - Publisher
extension Publishers {
@available(OSX 10.15, iOS 13.0, tvOS 13.0, watchOS 6.0, *)
public struct WithLatestFrom<Upstream: Publisher,
Other: Publisher,
Output>: Publisher where Upstream.Failure == Other.Failure {
public typealias Failure = Upstream.Failure
public typealias ResultSelector = (Upstream.Output, Other.Output) -> Output
private let upstream: Upstream
private let other: Other
private let resultSelector: ResultSelector
private var latestValue: Other.Output?
init(upstream: Upstream,
other: Other,
resultSelector: @escaping ResultSelector) {
self.upstream = upstream
self.other = other
self.resultSelector = resultSelector
}
public func receive<S: Subscriber>(subscriber: S) where Failure == S.Failure, Output == S.Input {
let helper = DownStreamHelper(subscriber: subscriber,
other: other,
resultSelector: resultSelector)
let sub = DownstreamSubscription(upstream: upstream, downStream: helper)
subscriber.receive(subscription: sub)
}
}
}
// MARK: - Subscription
@available(OSX 10.15, iOS 13.0, tvOS 13.0, watchOS 6.0, *)
extension Publishers.WithLatestFrom {
private final class DownStreamHelper<S: Subscriber>: DownStreamHelperProtocol where S.Input == Output, S.Failure == Upstream.Failure {
typealias Input = Upstream.Output
typealias Failure = Upstream.Failure
private var subscriber: S?
private var resultSelector: ResultSelector?
private var latestValue: Other.Output?
private var otherCancelable: Cancellable?
init(subscriber: S,
other: Other,
resultSelector: @escaping ResultSelector) {
self.subscriber = subscriber
self.resultSelector = resultSelector
self.otherCancelable = other
.sink(receiveCompletion: {_ in },
receiveValue: { [weak self] value in
self?.latestValue = value
})
}
func request(_ demand: Subscribers.Demand) -> Subscribers.Demand {
return demand
}
func downstream(_ value: Input) -> Subscribers.Demand {
guard let resultSelector = self.resultSelector,
let subscriber = self.subscriber else { return .none }
guard let latest = self.latestValue else { return .max(1) }
return subscriber.receive(resultSelector(value, latest))
}
func downstream(_ completion: Subscribers.Completion<Failure>) {
self.subscriber?.receive(completion: completion)
self.subscriber = nil
}
func cancel() {
subscriber = nil
resultSelector = nil
latestValue = nil
otherCancelable?.cancel()
otherCancelable = nil
}
}
}
import Combine
extension Publisher {
func trackActivity(with indicator: CurrentValueSubject<Bool, Never>?) -> Publishers.HandleEvents<Self> {
return self.handleEvents(receiveSubscription: { _ in
indicator?.send(true)
}, receiveCompletion: { _ in
indicator?.send(false)
})
}
}
@serbats
Copy link
Author

serbats commented Feb 12, 2020

Example 1 withLatestFrom:

var subscriptions: [AnyCancellable] = []

let subject1 = PassthroughSubject<String, Never>()
let subject2 = PassthroughSubject<Int, Never>()

subject1
    .withLatestFrom(subject2) { ($0, $1) }
    .sink(receiveCompletion: { print($0) },
          receiveValue: { print($0) })
    .store(in: &subscriptions)

subject2.send(1)
subject1.send("First")
subject1.send("Second")
subject2.send(2)
subject1.send("Third")
subject1.send(completion: .finished)

Gives:

("First", 1)
("Second", 1)
("Third", 2)
finished

Example 2 withLatestFrom:

var subscriptions: [AnyCancellable] = []

let subject1 = PassthroughSubject<Void, Never>()
let subject2 = PassthroughSubject<Int, Never>()

subject1
    .withLatestFrom(subject2)
    .sink(receiveCompletion: { print($0) },
          receiveValue: { print($0) })
    .store(in: &subscriptions)

subject2.send(1)
subject1.send()
subject1.send()
subject2.send(2)
subject1.send()
subject1.send(completion: .finished)

Gives:

1
1
2
finished

Example 3 materialize:

var subscriptions: [AnyCancellable] = []

enum MyError: Error {
    case testError
}

let subject = PassthroughSubject<Int, MyError>()

subject
    .materialize()
    .sink(receiveCompletion: { print($0) },
          receiveValue: { print($0) })
    .store(in: &subscriptions)

subject.send(1)
subject.send(2)
subject.send(completion: .failure(.testError))

Gives:

value(1)
value(2)
completion(Combine.Subscribers.Completion<__lldb_expr_5.MyError>.failure(__lldb_expr_5.MyError.testError))
finished

Example 4 materialize(forceRecieveCompletion: false):

enum MyError: Error {
    case testError
}

var subscriptionExt: Subscription?

let subscriber = AnySubscriber<Event<Int, MyError>, Never>(receiveSubscription: { (subscription) in
    subscription.request(.max(2))
    subscriptionExt = subscription
}, receiveValue: { value in
    debugPrint("value received - \(value)")
    
    if case .value(let intValue) = value {
        return intValue != 3 ? .max(1) : .none
    }
    
    return .max(1)
}) {
    print($0)
}

let intSubject = PassthroughSubject<Int, MyError>()

intSubject
    .materialize(forceRecieveCompletion: false)
    .subscribe(subscriber)

intSubject.send(1)
intSubject.send(2)
intSubject.send(3)
intSubject.send(4)
intSubject.send(3)
intSubject.send(5)
intSubject.send(6)
intSubject.send(completion: .failure(.testError))
intSubject.send(7)
intSubject.send(8)
/*subscriptionExt?.request(.max(1))*/

//- if uncomment will add to output:
//"value received - completion(Combine.Subscribers.Completion<__lldb_expr_71.MyError>.failure(__lldb_expr_71.MyError.testError))"
//finished

Gives:

"value received - value(1)"
"value received - value(2)"
"value received - value(3)"
"value received - value(4)"
"value received - value(3)"

Example 5 dematerialize:


var subscriptions: [AnyCancellable] = []

enum MyError: Error {
    case testError
}

let subject = PassthroughSubject<Event<Int, MyError>, Never>()

subject
    .dematerialize()
    .sink(receiveCompletion: { print($0) },
          receiveValue: { print($0) })
    .store(in: &subscriptions)

subject.send(.value(1))
subject.send(.value(2))
subject.send(.completion(.failure(.testError)))
subject.send(.value(3))
subject.send(.value(4))
subject.send(completion: .finished)

Gives:

1
2
failure(__lldb_expr_5.MyError.testError)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment