Last active
May 15, 2019 20:06
-
-
Save kaqu/130de170b23fb2725973e97101b3df0e to your computer and use it in GitHub Desktop.
Swift Actor
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
//////////////////////////////////////////////////////// | |
//// From futura: https://github.com/miquido/futura //// | |
//////////////////////////////////////////////////////// | |
import Darwin | |
import libkern | |
public enum Mutex { | |
public typealias Pointer = UnsafeMutablePointer<pthread_mutex_t> | |
@inline(__always) | |
public static func make(recursive: Bool) -> Pointer { | |
let pointer: UnsafeMutablePointer<pthread_mutex_t> = .allocate(capacity: 1) | |
let attr: UnsafeMutablePointer<pthread_mutexattr_t> = .allocate(capacity: 1) | |
guard pthread_mutexattr_init(attr) == 0 else { preconditionFailure() } | |
pthread_mutexattr_settype(attr, recursive ? PTHREAD_MUTEX_RECURSIVE : PTHREAD_MUTEX_NORMAL) | |
pthread_mutexattr_setpshared(attr, PTHREAD_PROCESS_PRIVATE) | |
guard pthread_mutex_init(pointer, attr) == 0 else { preconditionFailure() } | |
pthread_mutexattr_destroy(attr) | |
attr.deinitialize(count: 1) | |
attr.deallocate() | |
return pointer | |
} | |
@inline(__always) | |
public static func destroy(_ pointer: Pointer) { | |
pthread_mutex_destroy(pointer) | |
pointer.deinitialize(count: 1) | |
pointer.deallocate() | |
} | |
@inline(__always) | |
public static func lock(_ pointer: Pointer) { | |
pthread_mutex_lock(pointer) | |
} | |
@inline(__always) | |
public static func tryLock(_ pointer: Pointer) -> Bool { | |
return pthread_mutex_trylock(pointer) == 0 | |
} | |
@inline(__always) | |
public static func unlock(_ pointer: Pointer) { | |
pthread_mutex_unlock(pointer) | |
} | |
} | |
internal final class List<Element> { | |
internal final class Item { | |
internal let element: Element | |
fileprivate var next: Item? | |
fileprivate init(element: Element) { | |
self.element = element | |
} | |
} | |
internal let mtx = Mutex.make(recursive: false) | |
internal var nextItem: Item? | |
internal var lastItem: Item? | |
deinit { | |
Mutex.destroy(mtx) | |
} | |
@inline(__always) | |
fileprivate func append(_ element: Element) { | |
Mutex.lock(mtx) | |
defer { Mutex.unlock(mtx) } | |
let item: Item = .init(element: element) | |
if let lastItem = lastItem { | |
lastItem.next = item | |
} else { | |
self.nextItem = item | |
} | |
self.lastItem = item | |
} | |
@inline(__always) | |
fileprivate func next() -> Element? { | |
Mutex.lock(mtx) | |
defer { Mutex.unlock(mtx) } | |
guard let nextItem = nextItem else { | |
return nil | |
} | |
if let item = nextItem.next { | |
self.nextItem = item | |
} else { | |
self.nextItem = nil | |
self.lastItem = nil | |
} | |
return nextItem.element | |
} | |
} | |
public enum AtomicFlag { | |
public typealias Pointer = UnsafeMutablePointer<atomic_flag> | |
@inline(__always) | |
public static func make() -> Pointer { | |
let pointer = Pointer.allocate(capacity: 1) | |
pointer.pointee = atomic_flag() | |
return pointer | |
} | |
@inline(__always) | |
public static func destroy(_ pointer: Pointer) { | |
pointer.deinitialize(count: 1) | |
pointer.deallocate() | |
} | |
@discardableResult @inline(__always) | |
public static func readAndSet(_ pointer: Pointer) -> Bool { | |
return atomic_flag_test_and_set(pointer) | |
} | |
@inline(__always) | |
public static func clear(_ pointer: Pointer) { | |
atomic_flag_clear(pointer) | |
} | |
} | |
//////////////////////////////////////////////////////// | |
//////////////////////// original ////////////////////// | |
//////////////////////////////////////////////////////// | |
import Foundation | |
public protocol Executor { | |
func execute(_ task: @escaping () -> Void) | |
} | |
public final class Actor<Message, State> { | |
private let mtx: Mutex.Pointer | |
private var messageQueue: List<Message> = .init() | |
private let executor: Executor | |
private var activityFlag: AtomicFlag.Pointer | |
private let process: (Message, State) -> State | |
private var state: State | |
internal init(executor: Executor = DispatchQueue.global(), initialState: State, _ process: @escaping (Message, State) -> State) { | |
precondition(!(type(of: initialState) is AnyClass), "You can use only value types in Actor (struct / enum)") | |
self.mtx = Mutex.make(recursive: true) | |
self.activityFlag = AtomicFlag.make() | |
self.executor = executor | |
self.state = initialState | |
self.process = process | |
} | |
deinit { | |
Mutex.destroy(mtx) | |
mtx.deallocate() | |
AtomicFlag.destroy(activityFlag) | |
activityFlag.deallocate() | |
} | |
public func pass(_ message: Message) { | |
Mutex.lock(mtx) | |
defer { Mutex.unlock(mtx) } | |
messageQueue.append(message) | |
guard !AtomicFlag.readAndSet(activityFlag) else { return } | |
executor.execute(exec) | |
} | |
private func exec() { | |
while let message = messageQueue.next() { | |
state = process(message, state) | |
} | |
AtomicFlag.clear(activityFlag) | |
} | |
} | |
extension DispatchQueue: Executor { | |
public func execute(_ task: @escaping () -> Void) { | |
async(execute: task) | |
} | |
} |
And producer/consumer example:
enum ProducerActorMessage<Product> {
case produce
case assingConsumer((Product) -> Void)
}
struct ProducerActorState<Product> {
var consumers: [(Product) -> Void] = []
}
let producerActor: Actor<ProducerActorMessage<Int>, ProducerActorState<Int>>
= .init(initialState: .init())
{ (message: ProducerActorMessage<Int>, state: ProducerActorState<Int>) -> ProducerActorState<Int> in
switch message {
case .produce:
state.consumers.forEach { $0(Int.random(in: Int.min ... Int.max)) }
return state
case let .assingConsumer(consumer):
var newState = state
newState.consumers.append(consumer)
return newState
}
}
enum ConsumerActorMessage<Item> {
case recieve(Item)
}
let consumerActor: Actor<ConsumerActorMessage<Int>, Void>
= .init(initialState: ())
{ (message: ConsumerActorMessage<Int>, state: Void) -> Void in
switch message {
case let .recieve(item):
print("recieved \(item)")
}
return state
}
producerActor.pass(.assingConsumer({ consumerActor.pass(.recieve($0))}))
producerActor.pass(.produce)
producerActor.pass(.produce)
producerActor.pass(.produce)
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Example:
It is not allowed to use class (reference types) inside. Code below will fail (not pass precondition).