Skip to content

Instantly share code, notes, and snippets.

@sherlock1982
Created December 9, 2019 09:26
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save sherlock1982/cfe3aeb997c62cfd7d42308282dfd1ee to your computer and use it in GitHub Desktop.
Save sherlock1982/cfe3aeb997c62cfd7d42308282dfd1ee to your computer and use it in GitHub Desktop.
Proper Combine switchMap operator based on OpenCombine sources
//
// Publishers.FlatMap.swift
//
// Created by Eric Patey on 16.08.2019.
// Modified by Nickolay Orekhov
// SwitchMap operator based on FlatMap
import Combine
// import COpenCombineHelpers
class UnfairLock {
private var lockObject = os_unfair_lock()
init(){
}
deinit{
}
func lock(){
os_unfair_lock_lock(&lockObject)
}
func unlock(){
os_unfair_lock_unlock(&lockObject)
}
}
extension Publisher {
/// Transforms all elements from an upstream publisher into a new or existing
/// publisher.
///
/// `flatMap` merges the output from all returned publishers into a single stream of
/// output.
///
/// - Parameters:
/// - maxPublishers: The maximum number of publishers produced by this method.
/// - transform: A closure that takes an element as a parameter and returns a
/// publisher that produces elements of that type.
/// - Returns: A publisher that transforms elements from an upstream publisher into
/// a publisher of that element’s type.
public func switchMap<Result, Child: Publisher>(
maxPublishers: Subscribers.Demand = .unlimited,
_ transform: @escaping (Output) -> Child
) -> Publishers.SwitchMap<Child, Self>
where Result == Child.Output, Failure == Child.Failure {
return .init(upstream: self,
maxPublishers: maxPublishers,
transform: transform)
}
}
extension Publishers {
public struct SwitchMap<Child: Publisher, Upstream: Publisher>: Publisher
where Child.Failure == Upstream.Failure
{
public typealias Output = Child.Output
public typealias Failure = Upstream.Failure
public let upstream: Upstream
public let maxPublishers: Subscribers.Demand
public let transform: (Upstream.Output) -> Child
public init(upstream: Upstream, maxPublishers: Subscribers.Demand,
transform: @escaping (Upstream.Output) -> Child) {
self.upstream = upstream
self.maxPublishers = maxPublishers
self.transform = transform
}
public func receive<Downstream: Subscriber>(subscriber: Downstream)
where Child.Output == Downstream.Input, Upstream.Failure == Downstream.Failure
{
let inner = Inner(downstream: subscriber,
maxPublishers: maxPublishers,
map: transform)
subscriber.receive(subscription: inner)
upstream.subscribe(inner)
}
}
}
extension Publishers.SwitchMap {
private final class Inner<Downstream: Subscriber>
: Subscriber,
Subscription,
CustomStringConvertible,
CustomReflectable,
CustomPlaygroundDisplayConvertible
where Downstream.Input == Child.Output, Downstream.Failure == Upstream.Failure
{
typealias Input = Upstream.Output
typealias Failure = Upstream.Failure
private typealias SubscriptionIndex = Int
/// All requests to this subscription should be made with the `outerLock`
/// acquired.
private var outerSubscription: Subscription?
// Must be recursive lock. Probably a bug in Combine.
/// The lock for requesting from `outerSubscription`.
private let outerLock = UnfairLock()
/// The lock for modifying the state. All mutable state here should be
/// read and modified with this lock acquired.
/// The only exception is the `downstreamRecursive` field, which is guarded
/// by the `downstreamLock`.
private let lock = UnfairLock()
// Must be recursive lock. Probably a bug in Combine.
/// All the calls to the downstream subscriber should be made with this lock
/// acquired.
private let downstreamLock = UnfairLock()
private let downstream: Downstream
private var downstreamDemand = Subscribers.Demand.none
/// This variable is set to `true` whenever we call `downstream.receive(_:)`,
/// and then set back to `false`.
private var downstreamRecursive = false
private var innerRecursive = false
private var innerSubscription: Subscription? = nil
private var currentInnerIndex: SubscriptionIndex = 0
private var pendingSubscriptions = 0
private var buffer = [Child.Output]()
private let maxPublishers: Subscribers.Demand
private let map: (Input) -> Child
private var cancelledOrCompleted = false
private var outerFinished = false
init(downstream: Downstream,
maxPublishers: Subscribers.Demand,
map: @escaping (Upstream.Output) -> Child) {
self.downstream = downstream
self.maxPublishers = maxPublishers
self.map = map
}
deinit {
// outerLock.deallocate()
// lock.deallocate()
// downstreamLock.deallocate()
}
// MARK: - Subscriber
fileprivate func receive(subscription: Subscription) {
lock.lock()
guard outerSubscription == nil, !cancelledOrCompleted else {
lock.unlock()
subscription.cancel()
return
}
outerSubscription = subscription
lock.unlock()
subscription.request(maxPublishers)
}
fileprivate func receive(_ input: Upstream.Output) -> Subscribers.Demand {
lock.lock()
let cancelledOrCompleted = self.cancelledOrCompleted
lock.unlock()
if cancelledOrCompleted {
return .none
}
let child = map(input)
lock.lock()
currentInnerIndex += 1
let innerIndex = currentInnerIndex
pendingSubscriptions += 1
lock.unlock()
child.subscribe(Side(index: innerIndex, inner: self))
return .none
}
fileprivate func receive(completion: Subscribers.Completion<Child.Failure>) {
outerSubscription = nil
lock.lock()
outerFinished = true
switch completion {
case .finished:
releaseLockThenSendCompletionDownstreamIfNeeded(outerFinished: true)
return
case .failure:
let wasAlreadyCancelledOrCompleted = cancelledOrCompleted
cancelledOrCompleted = true
innerSubscription = nil
lock.unlock()
if wasAlreadyCancelledOrCompleted {
return
}
downstreamLock.lock()
downstream.receive(completion: completion)
downstreamLock.unlock()
}
}
// MARK: - Subscription
fileprivate func request(_ demand: Subscribers.Demand) {
// demand.assertNonZero()
if downstreamRecursive {
// downstreamRecursive being true means that downstreamLock
// is already acquired.
downstreamDemand += demand
return
}
lock.lock()
if cancelledOrCompleted {
lock.unlock()
return
}
if demand == .unlimited {
downstreamDemand = .unlimited
let buffer = self.buffer
self.buffer = []
lock.unlock()
downstreamLock.lock()
downstreamRecursive = true
for childOutput in buffer {
_ = downstream.receive(childOutput)
}
downstreamRecursive = false
downstreamLock.unlock()
innerSubscription?.request(.unlimited)
lock.lock()
} else {
downstreamDemand += demand
while !buffer.isEmpty && downstreamDemand > 0 {
// FIXME: This has quadratic complexity.
// This is what Combine does.
// Can we improve perfomance by using e. g. Deque instead of Array?
// Or array's cache locality makes this solution more efficient?
// Must benchmark before optimizing!
//
// https://www.cocoawithlove.com/blog/2016/09/22/deque.html
let value = buffer.removeFirst()
downstreamDemand -= 1
let subscription = innerSubscription
lock.unlock()
downstreamLock.lock()
downstreamRecursive = true
let additionalDemand = downstream.receive(value)
downstreamRecursive = false
downstreamLock.unlock()
if additionalDemand != .none {
lock.lock()
downstreamDemand += additionalDemand
lock.unlock()
}
if let subscription = subscription {
innerRecursive = true
subscription.request(.max(1))
innerRecursive = false
}
lock.lock()
}
}
releaseLockThenSendCompletionDownstreamIfNeeded(outerFinished: outerFinished)
}
fileprivate func cancel() {
lock.lock()
cancelledOrCompleted = true
let subscription = self.innerSubscription
self.innerSubscription = nil
lock.unlock()
subscription?.cancel()
// Combine doesn't acquire the lock here. Weird.
outerSubscription?.cancel()
outerSubscription = nil
}
// MARK: - Reflection
fileprivate var description: String { return "FlatMap" }
fileprivate var customMirror: Mirror {
return Mirror(self, children: EmptyCollection())
}
fileprivate var playgroundDescription: Any { return description }
// MARK: - Private
private func receiveInner(subscription: Subscription,
_ index: SubscriptionIndex) {
lock.lock()
pendingSubscriptions -= 1
if index == currentInnerIndex {
innerSubscription = subscription
}
let demand = downstreamDemand == .unlimited
? Subscribers.Demand.unlimited
: .max(1)
lock.unlock()
subscription.request(demand)
}
private func receiveInner(_ input: Child.Output,
_ index: SubscriptionIndex) -> Subscribers.Demand {
lock.lock()
if index < currentInnerIndex {
lock.unlock()
return .none
}
if downstreamDemand == .unlimited {
lock.unlock()
downstreamLock.lock()
downstreamRecursive = true
_ = downstream.receive(input)
downstreamRecursive = false
downstreamLock.unlock()
return .none
}
if downstreamDemand == .none || innerRecursive {
buffer.append(input)
lock.unlock()
return .none
}
downstreamDemand -= 1
lock.unlock()
downstreamLock.lock()
downstreamRecursive = true
let newDemand = downstream.receive(input)
downstreamRecursive = false
downstreamLock.unlock()
if newDemand > 0 {
lock.lock()
downstreamDemand += newDemand
lock.unlock()
}
return .max(1)
}
private func receiveInner(completion: Subscribers.Completion<Child.Failure>,
_ index: SubscriptionIndex) {
switch completion {
case .finished:
lock.lock()
if index < currentInnerIndex {
lock.unlock()
return
}
innerSubscription = nil
let downstreamCompleted = releaseLockThenSendCompletionDownstreamIfNeeded(
outerFinished: outerFinished
)
if !downstreamCompleted {
requestOneMorePublisher()
}
case .failure:
lock.lock()
if index < currentInnerIndex || cancelledOrCompleted {
lock.unlock()
return
}
cancelledOrCompleted = true
let subscription = self.innerSubscription
self.innerSubscription = nil
lock.unlock()
subscription?.cancel()
downstreamLock.lock()
downstream.receive(completion: completion)
downstreamLock.unlock()
}
}
private func requestOneMorePublisher() {
if maxPublishers != .unlimited {
outerLock.lock()
outerSubscription?.request(.max(1))
outerLock.unlock()
}
}
/// - Precondition: `lock` is acquired
/// - Postcondition: `lock` is released
///
/// - Returns: `true` if a completion was sent downstream
@discardableResult
private func releaseLockThenSendCompletionDownstreamIfNeeded(
outerFinished: Bool
) -> Bool {
// #if DEBUG
// lock.assertOwner() // Sanity check
// #endif
if !cancelledOrCompleted && outerFinished && buffer.isEmpty &&
innerSubscription == nil && pendingSubscriptions == 0 {
cancelledOrCompleted = true
lock.unlock()
downstreamLock.lock()
downstream.receive(completion: .finished)
downstreamLock.unlock()
return true
}
lock.unlock()
return false
}
// MARK: - Side
private struct Side: Subscriber,
CustomStringConvertible,
CustomReflectable,
CustomPlaygroundDisplayConvertible {
private let index: SubscriptionIndex
private let inner: Inner
fileprivate let combineIdentifier = CombineIdentifier()
fileprivate init(index: SubscriptionIndex, inner: Inner) {
self.index = index
self.inner = inner
}
fileprivate func receive(subscription: Subscription) {
inner.receiveInner(subscription: subscription, index)
}
fileprivate func receive(_ input: Child.Output) -> Subscribers.Demand {
return inner.receiveInner(input, index)
}
fileprivate func receive(completion: Subscribers.Completion<Child.Failure>) {
inner.receiveInner(completion: completion, index)
}
fileprivate var description: String { return "FlatMap" }
fileprivate var customMirror: Mirror {
let children = CollectionOfOne<Mirror.Child>(
("parentSubscription", inner.combineIdentifier)
)
return Mirror(self, children: children)
}
fileprivate var playgroundDescription: Any { return description }
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment