Skip to content

Instantly share code, notes, and snippets.

@dchohfi
Created November 5, 2019 19:56
Show Gist options
  • Save dchohfi/92e5babb47f5f136a8986fca17acc47b to your computer and use it in GitHub Desktop.
Save dchohfi/92e5babb47f5f136a8986fca17acc47b to your computer and use it in GitHub Desktop.
Swift inout scan
extension Publisher {
public func inoutScan<Result>(
_ initialResult: Result,
_ nextPartialResult: @escaping (inout Result, Output) -> Void
) -> Publishers.InoutScan<Self, Result> {
return .init(upstream: self,
initialResult: initialResult,
nextPartialResult: nextPartialResult)
}
public func tryInoutScan<Result>(
_ initialResult: Result,
_ nextPartialResult: @escaping (inout Result, Output) throws -> Void
) -> Publishers.TryInoutScan<Self, Result> {
return .init(upstream: self,
initialResult: initialResult,
nextPartialResult: nextPartialResult)
}
}
extension Publishers {
public struct InoutScan<Upstream: Publisher, Output>: Publisher {
public typealias Failure = Upstream.Failure
public let upstream: Upstream
public let initialResult: Output
public let nextPartialResult: (inout Output, Upstream.Output) -> Void
public init(upstream: Upstream,
initialResult: Output,
nextPartialResult: @escaping (inout Output, Upstream.Output) -> Void) {
self.upstream = upstream
self.initialResult = initialResult
self.nextPartialResult = nextPartialResult
}
public func receive<Downstream: Subscriber>(subscriber: Downstream)
where Output == Downstream.Input, Upstream.Failure == Downstream.Failure
{
upstream.subscribe(Inner(downstream: subscriber,
initialResult: initialResult,
nextPartialResult: nextPartialResult))
}
}
public struct TryInoutScan<Upstream: Publisher, Output>: Publisher {
public typealias Failure = Error
public let upstream: Upstream
public let initialResult: Output
public let nextPartialResult: (inout Output, Upstream.Output) throws -> Void
public init(
upstream: Upstream,
initialResult: Output,
nextPartialResult: @escaping (inout Output, Upstream.Output) throws -> Void
) {
self.upstream = upstream
self.initialResult = initialResult
self.nextPartialResult = nextPartialResult
}
public func receive<Downstream: Subscriber>(subscriber: Downstream)
where Output == Downstream.Input, Downstream.Failure == Error
{
upstream.subscribe(Inner(downstream: subscriber,
initialResult: initialResult,
nextPartialResult: nextPartialResult))
}
}
}
extension Publishers.InoutScan {
private final class Inner<Downstream: Subscriber>
: Subscriber,
CustomStringConvertible,
CustomReflectable,
CustomPlaygroundDisplayConvertible
where Upstream.Failure == Downstream.Failure
{
typealias Input = Upstream.Output
typealias Failure = Upstream.Failure
private let downstream: Downstream
private let nextPartialResult: (inout Downstream.Input, Input) -> Void
private var result: Downstream.Input
fileprivate init(
downstream: Downstream,
initialResult: Downstream.Input,
nextPartialResult: @escaping (inout Downstream.Input, Input) -> Void
)
{
self.downstream = downstream
self.result = initialResult
self.nextPartialResult = nextPartialResult
}
func receive(subscription: Subscription) {
downstream.receive(subscription: subscription)
}
func receive(_ input: Input) -> Subscribers.Demand {
nextPartialResult(&result, input)
return downstream.receive(result)
}
func receive(completion: Subscribers.Completion<Failure>) {
downstream.receive(completion: completion)
}
var description: String { return "Scan" }
var customMirror: Mirror {
let children: [Mirror.Child] = [
("downstream", downstream),
("result", result)
]
return Mirror(self, children: children)
}
var playgroundDescription: Any { return description }
}
}
extension Publishers.TryInoutScan {
private final class Inner<Downstream: Subscriber>
: Subscriber,
Subscription,
CustomStringConvertible,
CustomReflectable,
CustomPlaygroundDisplayConvertible
where Downstream.Failure == Error
{
typealias Input = Upstream.Output
typealias Failure = Upstream.Failure
private let downstream: Downstream
private let nextPartialResult: (inout Downstream.Input, Input) throws -> Void
private var result: Downstream.Input
private var status = SubscriptionStatus.awaitingSubscription
private var lock: NSLock? = NSLock()
private var finished = false
fileprivate init(
downstream: Downstream,
initialResult: Downstream.Input,
nextPartialResult:
@escaping (inout Downstream.Input, Input) throws -> Void
) {
self.downstream = downstream
self.nextPartialResult = nextPartialResult
self.result = initialResult
}
deinit {
lock = nil
}
func receive(subscription: Subscription) {
lock?.lock()
guard case .awaitingSubscription = status else {
lock?.unlock()
subscription.cancel()
return
}
status = .subscribed(subscription)
lock?.unlock()
downstream.receive(subscription: self)
}
func receive(_ input: Input) -> Subscribers.Demand {
do {
try nextPartialResult(&result, input)
return downstream.receive(result)
} catch {
lock?.lock()
guard case let .subscribed(subscription) = status else {
lock?.unlock()
return .none
}
status = .terminal
lock?.unlock()
subscription.cancel()
downstream.receive(completion: .failure(error))
return .none
}
}
func receive(completion: Subscribers.Completion<Upstream.Failure>) {
// Combine doesn't use locking in this method!
guard case .subscribed = status else {
return
}
downstream.receive(completion: .finished)
}
func request(_ demand: Subscribers.Demand) {
lock?.lock()
guard case let .subscribed(subscription) = status else {
lock?.unlock()
return
}
lock?.unlock()
subscription.request(demand)
}
func cancel() {
lock?.lock()
guard case let .subscribed(subscription) = status else {
lock?.unlock()
return
}
status = .terminal
lock?.unlock()
subscription.cancel()
}
var description: String { return "TryScan" }
var customMirror: Mirror {
lock?.lock()
defer { lock?.unlock() }
let children: [Mirror.Child] = [
("downstream", downstream),
("status", status),
("result", result)
]
return Mirror(self, children: children)
}
var playgroundDescription: Any { return description }
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment