Skip to content

Instantly share code, notes, and snippets.

@fmo91
Last active December 29, 2020 13:43
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save fmo91/c9a2f5779d96d943b7d711f12a53dcff to your computer and use it in GitHub Desktop.
Save fmo91/c9a2f5779d96d943b7d711f12a53dcff to your computer and use it in GitHub Desktop.
Very simple but handy Publishers extension to create custom AnyPublisher as in RxSwift's Observable<E>.create method
// Copy and Paste this in your project
import Combine
import Foundation
extension Publishers {
struct PublisherObserver<V, E: Swift.Error> {
private let sendValue: (V) -> Void
private let sendCompletion: (Subscribers.Completion<E>) -> Void
init(
_ sendValue: @escaping (V) -> Void,
_ sendCompletion: @escaping (Subscribers.Completion<E>) -> Void
) {
self.sendValue = sendValue
self.sendCompletion = sendCompletion
}
func send(_ value: V) {
sendValue(value)
}
func send(completion: Subscribers.Completion<E>) {
sendCompletion(completion)
}
}
static func create<V, E: Swift.Error>(
observer: (PublisherObserver<V, E>) -> Void
) -> AnyPublisher<V, E> {
let subject = PassthroughSubject<V, E>()
observer(PublisherObserver(subject.send, subject.send(completion:)))
return subject.eraseToAnyPublisher()
}
}
import Combine
import Foundation
import PlaygroundSupport
enum AsyncOperationError: Swift.Error {
case sample
}
func executeAsyncOperationGeneric() -> AnyPublisher<Int, Swift.Error> {
return Publishers.create { observer in
DispatchQueue.main.asyncAfter(deadline: .now() + 2.0, execute: {
observer.send(10)
observer.send(20)
observer.send(15)
observer.send(20)
observer.send(15)
// observer.send(completion: .failure(AsyncOperationError.sample))
// observer.send(completion: .finished)
})
}
}
var cancellables = [AnyCancellable]()
executeAsyncOperationGeneric()
.sink(
receiveCompletion: { completion in
switch completion {
case .failure(let error):
print("Received error: \(error)")
case .finished:
print("Finished successfully")
}
},
receiveValue: { value in
print("Received value: \(value)")
}
)
.store(in: &cancellables)
PlaygroundPage.current.needsIndefiniteExecution = true
@fmo91
Copy link
Author

fmo91 commented Dec 28, 2020

Are you using the Combine framework but missing RxSwift's create method in Observable<E>?
Well, this extension may help you. Just import the first file, Publishers+Create.swift and start using the create method in Publishers as in the example in the second file of this gist.
Take a look at executeAsyncOperationGeneric, it will create an AnyPublisher<Int, Error> using the create method. The example will print:

Received value: 10
Received value: 20
Received value: 15
Received value: 20
Received value: 15

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment