Skip to content

Instantly share code, notes, and snippets.

@fmo91
Created December 23, 2020 18:41
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save fmo91/06510821a65fe798e02b485567ff50eb to your computer and use it in GitHub Desktop.
Save fmo91/06510821a65fe798e02b485567ff50eb to your computer and use it in GitHub Desktop.
EventStream implementation. It's something similar to Observable<Element> from RxSwift but just as a proof of concept.
import Foundation
public final class EventStream<Element> {
private(set) var handlers: [StreamSubscription: StreamHandler<Element>] = [:]
private var eventsQueue: [Event<Element>] = []
private var isEnabled: Bool = true
private init() {}
public static func create(_ handler: @escaping (StreamHandler<Element>) -> Void) -> EventStream<Element> {
let stream = EventStream<Element>()
handler(.init({ (event: Event<Element>) in
stream.sendIfPossible(event: event)
}))
return stream
}
public static func just(_ value: Element) -> EventStream<Element> {
return .create {
$0.send(.next(value))
$0.send(.completed)
}
}
public static func error(_ errorValue: Swift.Error) -> EventStream<Element> {
return .create {
$0.send(.error(errorValue))
}
}
private func sendIfPossible(event: Event<Element>) {
guard isEnabled else {
return
}
if handlers.isEmpty {
eventsQueue.append(event)
} else {
send(event: event)
}
}
private func send(event: Event<Element>) {
defer {
if event.isCompletion {
isEnabled = false
}
}
handlers.values.forEach { (handler: StreamHandler<Element>) in
handler.send(event)
}
}
private func dispose(withId id: UUID) {
self.handlers.removeValue(forKey: StreamSubscription(generator: { id }))
}
private func streamEventsQueue() {
for event in eventsQueue {
guard isEnabled else {
break
}
send(event: event)
}
eventsQueue.removeAll()
}
public func subscribe(handler: StreamHandler<Element>) -> StreamSubscription {
let subscriptionIdentifier = StreamSubscription(dispose: dispose(withId:))
handlers[subscriptionIdentifier] = handler
streamEventsQueue()
return subscriptionIdentifier
}
public func subscribe(
onNext: @escaping (Element) -> Void = { _ in },
onError: @escaping (Swift.Error) -> Void = { _ in },
onComplete: @escaping () -> Void = {}
) -> StreamSubscription {
return subscribe(
handler: StreamHandler<Element>(
onNext: onNext,
onError: onError,
onComplete: onComplete
)
)
}
}
public struct StreamSubscription: Hashable {
let id: UUID
let dispose: (UUID) -> Void
init(dispose: @escaping (UUID) -> Void = {_ in}, generator: () -> UUID = UUID.init) {
self.dispose = dispose
self.id = generator()
}
public func hash(into hasher: inout Hasher) {
hasher.combine(id)
}
public static func ==(lhs: StreamSubscription, rhs: StreamSubscription) -> Bool {
return lhs.id == rhs.id
}
}
public struct StreamHandler<Element> {
private let handler: (Event<Element>) -> Void
public init(_ handler: @escaping (Event<Element>) -> Void) {
self.handler = handler
}
public init(
onNext: @escaping (Element) -> Void = { _ in },
onError: @escaping (Swift.Error) -> Void = { _ in },
onComplete: @escaping () -> Void = {}
) {
handler = { event in
switch event {
case .next(let element):
onNext(element)
case .error(let error):
onError(error)
case .completed:
onComplete()
}
}
}
public func send(_ event: Event<Element>) {
handler(event)
}
}
public enum Event<Element> {
case next(Element)
case completed
case error(Swift.Error)
var isCompletion: Bool {
switch self {
case .completed, .error:
return true
case .next:
return false
}
}
func map<NewElement>(f: (Element) -> NewElement) -> Event<NewElement> {
switch self {
case .next(let element):
return .next(f(element))
case .error(let error):
return .error(error)
case .completed:
return .completed
}
}
}
extension EventStream {
public func map<NewElement>(_ transform: @escaping (Element) -> NewElement) -> EventStream<NewElement> {
return EventStream<NewElement>.create { (handler: StreamHandler<NewElement>) in
_ = self.subscribe(
onNext: { value in
handler.send(.next(transform(value)))
},
onError: { error in
handler.send(.error(error))
},
onComplete: {
handler.send(.completed)
}
)
}
}
public func flatMap<NewElement>(_ transform: @escaping (Element) -> EventStream<NewElement>) -> EventStream<NewElement> {
return .create { handler in
_ = self.subscribe(
onNext: { value in
let newStream = transform(value)
_ = newStream.subscribe(handler: handler)
},
onError: { error in
handler.send(.error(error))
},
onComplete: {
handler.send(.completed)
}
)
}
}
}
import UIKit
import PlaygroundSupport
let stream = EventStream<Int>.create { (handler: StreamHandler<Int>) in
handler.send(.next(10))
handler.send(.next(20))
handler.send(.next(30))
}
func createDelayedDescription(for number: Int) -> EventStream<String> {
return .create { handler in
DispatchQueue.main.asyncAfter(deadline: .now() + 3.0) {
handler.send(.next("Number is: \(number)"))
}
}
}
print("Starting!")
let subscription = stream
.map { $0 * 2 }
.map { $0 - 12 }
.flatMap(createDelayedDescription(for:))
.subscribe(
onNext: { value in
print(value)
},
onComplete: {
print("COMPLETED!")
}
)
PlaygroundPage.current.needsIndefiniteExecution = true
// It prints...
//
// Starting!
// Number is: 8
// Number is: 28
// Number is: 48
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment