Skip to content

Instantly share code, notes, and snippets.

@huynguyencong
Created April 4, 2023 22:03
Show Gist options
  • Save huynguyencong/32f0ba13f81c00898702501f4db47e6d to your computer and use it in GitHub Desktop.
Save huynguyencong/32f0ba13f81c00898702501f4db47e6d to your computer and use it in GitHub Desktop.
Custom Publisher and Subscriber with Combine framework
import Combine
/// A publisher that emit counter number every
class CounterPublisher: Publisher {
typealias Output = Int
typealias Failure = Error
private let completion: Completion?
init(completion: Completion? = nil) {
self.completion = completion
}
func receive<S>(subscriber: S) where S : Subscriber, Failure == S.Failure, Output == S.Input {
let subscription = CounterSubscription(subscriber: subscriber, completion: completion)
subscriber.receive(subscription: subscription)
}
enum Completion {
case failure(at: Int, error: Error)
case finished(at: Int)
}
class CounterSubscription<S>: Subscription where S: Subscriber, S.Input == Int, S.Failure == Error {
private let subscriber: S
private let completion: Completion?
private var count = 0
private var timer: Timer?
private var demand: Subscribers.Demand = .none
init(subscriber: S, completion: Completion?) {
self.subscriber = subscriber
self.completion = completion
}
func request(_ demand: Subscribers.Demand) {
self.demand = demand
self.timer = Timer.scheduledTimer(withTimeInterval: 1, repeats: true) { _ in
if self.demand > 0 {
self.count += 1
self.demand += self.subscriber.receive(self.count)
self.demand -= 1
}
if let completion = self.completion {
switch completion {
case .finished(let at):
if self.count == at {
self.subscriber.receive(completion: .finished)
}
case .failure(let at, let error):
if self.count == at {
self.subscriber.receive(completion: .failure(error))
}
}
}
}
}
func cancel() {
timer?.invalidate()
timer = nil
}
}
}
import SwiftUI
import Combine
struct ContentView: View {
@StateObject private var viewModel = ViewModel()
var body: some View {
VStack {
Button("Subscribe") {
viewModel.subscribe()
}
Button("Cancel") {
viewModel.cancel()
}
}
.padding()
}
}
class ViewModel: ObservableObject {
private var subscriptions = Set<AnyCancellable>()
enum ViewModelError: Error {
case exceed
}
func subscribe() {
CounterPublisher(completion: .finished(at: 5))
.mySink { completion in
print("Completion: \(completion)")
} receiveValue: { value in
print("Value: \(value)")
}
.store(in: &subscriptions)
}
func cancel() {
subscriptions = []
}
}
import Combine
class MySink<Input, Failure>: Subscriber, Cancellable where Failure: Error {
private var subscription: Subscription?
private let receiveCompletion: (Subscribers.Completion<Failure>) -> Void
private let receiveValue: (Input) -> Void
init(receiveCompletion: @escaping (Subscribers.Completion<Failure>) -> Void, receiveValue: @escaping (Input) -> Void) {
self.receiveCompletion = receiveCompletion
self.receiveValue = receiveValue
}
func receive(subscription: Subscription) {
self.subscription = subscription
subscription.request(.unlimited)
}
func receive(_ input: Input) -> Subscribers.Demand {
receiveValue(input)
return .none
}
func receive(completion: Subscribers.Completion<Failure>) {
receiveCompletion(completion)
subscription?.cancel()
}
func cancel() {
subscription?.cancel()
}
}
extension Publisher {
func mySink(receiveCompletion: @escaping (Subscribers.Completion<Failure>) -> Void, receiveValue: @escaping (Output) -> Void) -> AnyCancellable {
let subscriber = MySink(receiveCompletion: receiveCompletion, receiveValue: receiveValue)
subscribe(subscriber)
return AnyCancellable(subscriber)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment