Skip to content

Instantly share code, notes, and snippets.

@rob-brown rob-brown/Actor.swift
Last active Mar 8, 2018

Embed
What would you like to do?
Elixir-inspired concurrency primitives
//
// Actor.Swift
//
// Copyright (c) 2017 Robert Brown
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.
//
import Foundation
public final class Actor<Message, State> {
public typealias MessageHandler = (Message, State) -> State
public let mailbox: Mailbox<Message>
public convenience init(initialState: State, label: String = "pro.tricksofthetrade.Actor", messageHandler: @escaping MessageHandler) {
let state = Agent(state: initialState, label: label)
self.init(state: state, messageHandler: messageHandler)
}
public convenience init(initialState: State, queue: DispatchQueue, messageHandler: @escaping MessageHandler) {
let state = Agent(state: initialState, queue: queue)
self.init(state: state, messageHandler: messageHandler)
}
private init(state: Agent<State>, messageHandler: @escaping MessageHandler) {
self.mailbox = Mailbox<Message> { message in
state.update { messageHandler(message, $0) }
}
}
public func send(_ message: Message) {
mailbox.send(message)
}
}
public final class Mailbox<T> {
private let handler: ((T) -> Void)
public init(handler: @escaping ((T) -> Void)) {
self.handler = handler
}
public func send(_ message: T) {
handler(message)
}
}
//
// ActorRegistry.Swift
//
// Copyright (c) 2017 Robert Brown
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.
//
import Foundation
public enum ActorRegistryError: Error {
case idInUse
case noSuchID
case typeMismatch
}
public final class ActorRegistry {
private let registry = Agent<[ID:Any]>(state: [:], label: "pro.tricksofthetrade.ActorRegistry")
public init() {}
public func lookup<T, U>(id: ID) throws -> Actor<T, U> {
guard let mailbox = registry.fetch(closure: { $0[id] }) else { throw ActorRegistryError.noSuchID }
guard let result = mailbox as? Actor<T, U> else { throw ActorRegistryError.typeMismatch }
return result
}
public func register<T, U>(id: ID, actor: Actor<T, U>) throws {
let error: ActorRegistryError? = registry.fetchAndUpdate { state in
if state[id] == nil {
var newState = state
newState[id] = actor
return (nil, newState)
}
else {
return (.idInUse, state)
}
}
if let e = error {
throw e
}
}
public func unregister(id: ID) {
registry.update { state in
var newState = state
newState.removeValue(forKey: id)
return newState
}
}
}
extension ActorRegistry {
public struct ID: RawRepresentable, Equatable, Hashable, Comparable {
public let rawValue: String
public var hashValue: Int {
return rawValue.hashValue
}
public init(_ rawValue: String) {
self.rawValue = rawValue
}
public init(rawValue: String) {
self.rawValue = rawValue
}
public static func ==(lhs: ActorRegistry.ID, rhs: ActorRegistry.ID) -> Bool {
return lhs.rawValue == rhs.rawValue
}
public static func <(lhs: ActorRegistry.ID, rhs: ActorRegistry.ID) -> Bool {
return lhs.rawValue < rhs.rawValue
}
}
}
//
// Agent.Swift
//
// Copyright (c) 2017 Robert Brown
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.
//
import Foundation
public enum AgentConcurrencyType {
case sync
case async
}
public final class Agent<T> {
private let queue: DispatchQueue
private var state: T
public convenience init(state: T, label: String = "pro.tricksofthetrade.Agent") {
let queue = DispatchQueue(label: label, qos: .userInitiated, attributes: [], autoreleaseFrequency: .inherit, target: nil)
self.init(state: state, queue: queue)
}
public init(state: T, queue: DispatchQueue) {
self.state = state
self.queue = queue
}
public func fetch<U>(closure: ((T) -> U)) -> U {
var result: U!
sync { state in
result = closure(state)
}
return result
}
public func update(_ type: AgentConcurrencyType = .async, closure: @escaping (T) -> T) {
switch type {
case .async:
async { state in
self.state = closure(state)
}
case .sync:
sync { state in
self.state = closure(state)
}
}
}
public func fetchAndUpdate<U>(closure: (T) -> (U, T)) -> U {
var result: U!
sync { state in
let (returnValue, newState) = closure(state)
self.state = newState
result = returnValue
}
return result
}
public func cast(closure: @escaping ((T) -> Void)) {
async(closure: closure)
}
// MARK: - Helpers
private func sync(closure: ((T) -> Void)) {
queue.sync {
closure(state)
}
}
private func async(closure: @escaping ((T) -> Void)) {
queue.async {
closure(self.state)
}
}
}
//
// Process.Swift
//
// Copyright (c) 2017 Robert Brown
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.
//
import Foundation
public struct TaggedTuple<T> {
public let tag: String
public let value: T
public init(tag: String, value: T) {
self.tag = tag
self.value = value
}
public func toTuple() -> (tag: String, value: T) {
return (tag, value)
}
}
public typealias ProcessMessage = TaggedTuple<Any?>
public enum ProcessCastResult<T> {
case unknownMessage
case newState(T)
case newStateAndTimeout(T, TimeInterval)
}
public enum ProcessCallResult<State> {
case unknownMessage
case noReply(State)
case reply(ProcessMessage, State)
case replyAndTimeout(ProcessMessage, State, TimeInterval)
}
public enum ProcessTimeoutResult<T> {
case newState(T)
case newStateAndTimeout(T, TimeInterval)
}
public protocol ProcessDelegate: class {
associatedtype State
func handleCast(message: ProcessMessage, state: State) -> ProcessCastResult<State>
func handleCall(message: ProcessMessage, state: State) -> ProcessCallResult<State>
func handleTimeout(state: State) -> ProcessTimeoutResult<State>
func handleUnknownMessage(message: ProcessMessage, state: State)
}
extension ProcessDelegate {
func handleCast(message: ProcessMessage, state: State) -> ProcessCastResult<State> {
return .newState(state)
}
func handleCall(message: ProcessMessage, state: State) -> ProcessCallResult<State> {
return .noReply(state)
}
func handleTimeout(state: State) -> ProcessCastResult<State> {
return .newState(state)
}
func handleUnknownMessage(message: ProcessMessage, state: State) {
NSLog("Process got unknown message: \(message)")
}
}
public final class Process<Delegate: ProcessDelegate> {
public typealias State = Delegate.State
private let state: Agent<State>
private unowned let delegate: Delegate
private var timeout: TimeInterval?
private var timer: DispatchSource?
public convenience init(state: State, delegate: Delegate, label: String = "pro.tricksofthetrade.Process") {
self.init(state: Agent(state: state, label: label), delegate: delegate)
}
public init(state: Agent<State>, delegate: Delegate) {
self.state = state
self.delegate = delegate
}
public func cast(message: ProcessMessage) {
state.update { state in
let result = self.delegate.handleCast(message: message, state: state)
switch result {
case .unknownMessage:
self.updateTimeout(timeout: self.timeout)
self.delegate.handleUnknownMessage(message: message, state: state)
return state
case .newState(let newState):
self.updateTimeout(timeout: self.timeout)
return newState
case .newStateAndTimeout(let newState, let timeout):
self.updateTimeout(timeout: timeout)
return newState
}
}
}
public func call(message: ProcessMessage) -> ProcessMessage? {
return state.fetchAndUpdate { state in
let result = self.delegate.handleCall(message: message, state: state)
switch result {
case .unknownMessage:
self.updateTimeout(timeout: self.timeout)
self.delegate.handleUnknownMessage(message: message, state: state)
return (nil, state)
case .noReply(let state):
self.updateTimeout(timeout: self.timeout)
return (nil, state)
case .reply(let returnValue, let newState):
self.updateTimeout(timeout: self.timeout)
return (returnValue, newState)
case .replyAndTimeout(let returnValue, let newState, let timeout):
self.updateTimeout(timeout: timeout)
return (returnValue, newState)
}
}
}
// MARK: - Helpers
private func updateTimeout(timeout: TimeInterval?) {
self.timer?.cancel()
self.timer = nil
self.timeout = timeout
guard let timeout = timeout else { return }
let timer = DispatchSource.makeTimerSource(queue: DispatchQueue.main)
let deadline = DispatchTime.now() + .milliseconds(Int(timeout))
timer.scheduleOneshot(deadline: deadline, leeway: .milliseconds(100))
timer.setEventHandler { [weak self] in
self?.timeoutElapsed()
}
}
private func timeoutElapsed() {
state.update { state in
let result = self.delegate.handleTimeout(state: state)
switch result {
case .newState(let newState):
self.updateTimeout(timeout: self.timeout)
return newState
case .newStateAndTimeout(let newState, let timeout):
self.updateTimeout(timeout: timeout)
return newState
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.