Skip to content

Instantly share code, notes, and snippets.

@kaqu
Last active May 15, 2019 20:06
Show Gist options
  • Save kaqu/130de170b23fb2725973e97101b3df0e to your computer and use it in GitHub Desktop.
Save kaqu/130de170b23fb2725973e97101b3df0e to your computer and use it in GitHub Desktop.
Swift Actor
////////////////////////////////////////////////////////
//// From futura: https://github.com/miquido/futura ////
////////////////////////////////////////////////////////
import Darwin
import libkern
public enum Mutex {
public typealias Pointer = UnsafeMutablePointer<pthread_mutex_t>
@inline(__always)
public static func make(recursive: Bool) -> Pointer {
let pointer: UnsafeMutablePointer<pthread_mutex_t> = .allocate(capacity: 1)
let attr: UnsafeMutablePointer<pthread_mutexattr_t> = .allocate(capacity: 1)
guard pthread_mutexattr_init(attr) == 0 else { preconditionFailure() }
pthread_mutexattr_settype(attr, recursive ? PTHREAD_MUTEX_RECURSIVE : PTHREAD_MUTEX_NORMAL)
pthread_mutexattr_setpshared(attr, PTHREAD_PROCESS_PRIVATE)
guard pthread_mutex_init(pointer, attr) == 0 else { preconditionFailure() }
pthread_mutexattr_destroy(attr)
attr.deinitialize(count: 1)
attr.deallocate()
return pointer
}
@inline(__always)
public static func destroy(_ pointer: Pointer) {
pthread_mutex_destroy(pointer)
pointer.deinitialize(count: 1)
pointer.deallocate()
}
@inline(__always)
public static func lock(_ pointer: Pointer) {
pthread_mutex_lock(pointer)
}
@inline(__always)
public static func tryLock(_ pointer: Pointer) -> Bool {
return pthread_mutex_trylock(pointer) == 0
}
@inline(__always)
public static func unlock(_ pointer: Pointer) {
pthread_mutex_unlock(pointer)
}
}
internal final class List<Element> {
internal final class Item {
internal let element: Element
fileprivate var next: Item?
fileprivate init(element: Element) {
self.element = element
}
}
internal let mtx = Mutex.make(recursive: false)
internal var nextItem: Item?
internal var lastItem: Item?
deinit {
Mutex.destroy(mtx)
}
@inline(__always)
fileprivate func append(_ element: Element) {
Mutex.lock(mtx)
defer { Mutex.unlock(mtx) }
let item: Item = .init(element: element)
if let lastItem = lastItem {
lastItem.next = item
} else {
self.nextItem = item
}
self.lastItem = item
}
@inline(__always)
fileprivate func next() -> Element? {
Mutex.lock(mtx)
defer { Mutex.unlock(mtx) }
guard let nextItem = nextItem else {
return nil
}
if let item = nextItem.next {
self.nextItem = item
} else {
self.nextItem = nil
self.lastItem = nil
}
return nextItem.element
}
}
public enum AtomicFlag {
public typealias Pointer = UnsafeMutablePointer<atomic_flag>
@inline(__always)
public static func make() -> Pointer {
let pointer = Pointer.allocate(capacity: 1)
pointer.pointee = atomic_flag()
return pointer
}
@inline(__always)
public static func destroy(_ pointer: Pointer) {
pointer.deinitialize(count: 1)
pointer.deallocate()
}
@discardableResult @inline(__always)
public static func readAndSet(_ pointer: Pointer) -> Bool {
return atomic_flag_test_and_set(pointer)
}
@inline(__always)
public static func clear(_ pointer: Pointer) {
atomic_flag_clear(pointer)
}
}
////////////////////////////////////////////////////////
//////////////////////// original //////////////////////
////////////////////////////////////////////////////////
import Foundation
public protocol Executor {
func execute(_ task: @escaping () -> Void)
}
public final class Actor<Message, State> {
private let mtx: Mutex.Pointer
private var messageQueue: List<Message> = .init()
private let executor: Executor
private var activityFlag: AtomicFlag.Pointer
private let process: (Message, State) -> State
private var state: State
internal init(executor: Executor = DispatchQueue.global(), initialState: State, _ process: @escaping (Message, State) -> State) {
precondition(!(type(of: initialState) is AnyClass), "You can use only value types in Actor (struct / enum)")
self.mtx = Mutex.make(recursive: true)
self.activityFlag = AtomicFlag.make()
self.executor = executor
self.state = initialState
self.process = process
}
deinit {
Mutex.destroy(mtx)
mtx.deallocate()
AtomicFlag.destroy(activityFlag)
activityFlag.deallocate()
}
public func pass(_ message: Message) {
Mutex.lock(mtx)
defer { Mutex.unlock(mtx) }
messageQueue.append(message)
guard !AtomicFlag.readAndSet(activityFlag) else { return }
executor.execute(exec)
}
private func exec() {
while let message = messageQueue.next() {
state = process(message, state)
}
AtomicFlag.clear(activityFlag)
}
}
extension DispatchQueue: Executor {
public func execute(_ task: @escaping () -> Void) {
async(execute: task)
}
}
@kaqu
Copy link
Author

kaqu commented May 15, 2019

And producer/consumer example:

enum ProducerActorMessage<Product> {
    case produce
    case assingConsumer((Product) -> Void)
}
struct ProducerActorState<Product> {
    var consumers: [(Product) -> Void] = []
}
let producerActor: Actor<ProducerActorMessage<Int>, ProducerActorState<Int>>
    = .init(initialState: .init())
    { (message: ProducerActorMessage<Int>, state: ProducerActorState<Int>) -> ProducerActorState<Int> in
        switch message {
        case .produce:
            state.consumers.forEach { $0(Int.random(in: Int.min ... Int.max)) }
            return state
        case let .assingConsumer(consumer):
            var newState = state
            newState.consumers.append(consumer)
            return newState
        }
}

enum ConsumerActorMessage<Item> {
    case recieve(Item)
}
let consumerActor: Actor<ConsumerActorMessage<Int>, Void>
    = .init(initialState: ())
    { (message: ConsumerActorMessage<Int>, state: Void) -> Void in
        switch message {
        case let .recieve(item):
            print("recieved \(item)")
        }
        return state
}
producerActor.pass(.assingConsumer({ consumerActor.pass(.recieve($0))}))
producerActor.pass(.produce)
producerActor.pass(.produce)
producerActor.pass(.produce)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment