Skip to content

Instantly share code, notes, and snippets.

@eonil
Created May 20, 2020 18:48
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save eonil/bb3fc5efd4dd1b619e92ff975ba4b780 to your computer and use it in GitHub Desktop.
Save eonil/bb3fc5efd4dd1b619e92ff975ba4b780 to your computer and use it in GitHub Desktop.
How to write imperative I/O control program with GCD (spawned thread) and Combine.
import Foundation
import Combine
final class Task<I,O> {
let control = PassthroughSubject<Int,Error>()
let report = PassthroughSubject<Int,Error>()
init() {
let i = Channel(control)
let o = Channel(report)
DispatchQueue.global().async { [i,o] in
do {
/// Here we can easily write programs with arbitrary complexity.
let m = try i.read()
guard m == 111 else { throw TaskError.unexpectedInput(m) }
switch m {
case 222:
for _ in 0..<100 {
let x = try i.read()
o.send(x * 2)
o.send(x * 10 + 1)
}
o.send(completion: .finished)
return
case 333:
guard m == 444 else { throw TaskError.unexpectedInput(m) }
o.send(999)
o.send(888)
guard m == 555 else { throw TaskError.unexpectedInput(m) }
o.send(111)
default:
o.send(completion: .finished)
return
}
}
catch let err {
o.send(completion: .failure(err))
}
}
}
deinit {
}
}
enum TaskError<I>: Error {
case unexpectedInput(I)
}
final class Channel<T> {
let gcdq = DispatchQueue(label: "Channel")
let sema = DispatchSemaphore(value: 0)
let pass: PassthroughSubject<T,Error>
var pipes = [AnyCancellable]()
var latestValue: T?
var latestCompletion: Subscribers.Completion<Error>?
init(_ x:PassthroughSubject<T,Error>) {
pass = x
weak var ws = self
pass.receive(on: gcdq).sink(
receiveCompletion: { m in ws?.processCompletion(m) },
receiveValue: { m in ws?.process(m) })
.store(in: &pipes)
}
private func processCompletion(_ m:Subscribers.Completion<Error>) {
latestCompletion = m
sema.signal()
}
private func process(_ m:T) {
latestValue = m
sema.signal()
}
func read() throws -> T {
sema.wait()
return try gcdq.sync {
if let m = latestValue {
latestValue = nil
return m
}
if let m = latestCompletion {
throw ChannelCompletionError.complete(m)
}
throw ChannelCompletionError.cancel
}
}
func send(_ m:T) {
pass.send(m)
}
func send(completion x:Subscribers.Completion<Error>) {
pass.send(completion: x)
}
}
enum ChannelCompletionError: Error {
case complete(Subscribers.Completion<Error>)
case cancel
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment