Skip to content

Instantly share code, notes, and snippets.

@s4cha
Last active September 20, 2018 13:21
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save s4cha/1b91936dc31dc5175b7f75ab5e43323c to your computer and use it in GitHub Desktop.
Save s4cha/1b91936dc31dc5175b7f75ab5e43323c to your computer and use it in GitHub Desktop.
Channel (pub sub ) in swift
public class Channel<Message: Equatable>: IsChannel {
private var subscriptions = ThreadSafeArray<Subscription>()
public func broadcast(_ message: Message) {
subscriptions.forEach { $0.trigger(message: message) }
}
public func subscribe(_ object: AnyObject,
for specificMessage: Message,
on queue: DispatchQueue? = nil,
callback: @escaping () -> Void) {
let filteredCallback = { (message: Message) in
if message == specificMessage { callback() }
}
subscriptions.append(Subscription(object: object, callback: filteredCallback, queue: queue))
}
public func subscribe(object: AnyObject, callback: @escaping (Message) -> Void) {
subscriptions.append(Subscription(object: object, callback: callback, queue: nil))
}
public func unbscribe(object: AnyObject) {
subscriptions.remove(where: { $0.object === object })
}
private struct Subscription {
weak var object: AnyObject?
let callback: (Message) -> Void
let queue: DispatchQueue?
init(object: AnyObject?, callback: @escaping (Message) -> Void, queue: DispatchQueue?) {
self.object = object
self.callback = callback
self.queue = queue
}
func trigger(message: Message) {
// Make sure object is still alive
if object == nil { return }
if let q = queue {
q.async { if self.object != nil { self.callback(message) } }
} else {
callback(message)
}
}
}
}
/// This encapsulates a thread safe array protected by its own Dispatch queue.
/// The queue is concurrent to allow for concurrent reads but writes are made
/// with the `barrier` flag so that only one write operation can happen at a time.
/// The write operations wait for pending reads to ficnish and new reads wait for the current
/// write operation to finish.
class ThreadSafeArray<T> {
private let queue = DispatchQueue(label: "com.freshOS.channel.threadSafeArray", attributes: .concurrent)
private var underlyingArray: [T]
init() { self.underlyingArray = [T]() }
func forEach(block: (T) -> Void) {
queue.sync { underlyingArray.forEach(block) }
}
public func append(_ newElement: T) {
threadSafeWrite { $0.append(newElement) }
}
public func remove(where clause: @escaping (T) -> Bool) {
threadSafeWrite { if let index = $0.index(where: clause) { $0.remove(at: index) } }
}
private func threadSafeWrite(callback:@escaping (inout [T]) -> Void) {
queue.async(flags: .barrier) { [weak self] in
if let strongSelf = self { callback(&strongSelf.underlyingArray) }
}
}
}
// Create a channel
let chan = Channel<String>()
// Receive all messages
chan.subscribe(object: self) { message in
print(message)
}
// Receive specific messages on a certain thread
chan.subscribe(self, for: "Hello", on: .main) {
print("received Hello")
}
// Send Message
chan.broadcast("Hello")
// Stop listening
chan.unbscribe(object: self)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment