Skip to content

Instantly share code, notes, and snippets.

@kaqu
Created September 28, 2019 19:43
Show Gist options
  • Save kaqu/3dc0d7aac70fdedd71dfe14807a2bc94 to your computer and use it in GitHub Desktop.
Save kaqu/3dc0d7aac70fdedd71dfe14807a2bc94 to your computer and use it in GitHub Desktop.
Futures
#if os(macOS) || os(iOS) || os(tvOS) || os(watchOS)
import libkern // check on Linux
public enum AtomicFlag {
public typealias Pointer = UnsafeMutablePointer<atomic_flag>
@inline(__always)
public static func make() -> Pointer {
let pointer = Pointer.allocate(capacity: 1)
pointer.pointee = atomic_flag()
return pointer
}
@inline(__always)
public static func destroy(_ pointer: Pointer) {
pointer.deinitialize(count: 1)
pointer.deallocate()
}
@discardableResult @inline(__always)
public static func readAndSet(_ pointer: Pointer) -> Bool {
return atomic_flag_test_and_set(pointer)
}
@inline(__always)
public static func clear(_ pointer: Pointer) {
atomic_flag_clear(pointer)
}
}
#else
#error("Unsupported platform")
#endif
@usableFromInline
internal final class CancelableFuture<Value>: Future<Value> {
fileprivate var cancelation: Cancelation = .init()
@usableFromInline
internal convenience init(cancelation: Cancelation) {
self.init()
self.cancelation = cancelation
}
override func unsafeComplete(with value: Value) {
guard cancelation.invalidate() else { return }
super.unsafeComplete(with: value)
}
}
// MARK: Cancel
public extension Future {
/// - warning: Cancelation object is not reusable, once it is canceled or future executes it is useless
@inlinable
func cancelable(using cancelation: Cancelation) -> Future {
lock.lock()
defer { lock.unlock() }
let nextFuture: CancelableFuture<Value> = .init(cancelation: cancelation)
unsafeExecuteOrSchedule(nextFuture.complete(with:))
return nextFuture
}
}
public final class Cancelation {
private let flag: AtomicFlag.Pointer = AtomicFlag.make()
public init() {}
deinit {
AtomicFlag.destroy(flag)
}
@discardableResult
public func cancel() -> Bool {
!AtomicFlag.readAndSet(flag)
}
internal func invalidate() -> Bool {
!AtomicFlag.readAndSet(flag)
}
}
import Foundation
public protocol Executor {
func execute(_ task: @escaping () -> Void)
}
extension DispatchQueue: Executor {
@inlinable public func execute(_ task: @escaping () -> Void) {
async(execute: task)
}
}
extension OperationQueue: Executor {
@inlinable public func execute(_ task: @escaping () -> Void) {
if OperationQueue.current == self {
task()
} else {
addOperation(task)
}
}
}
@usableFromInline
internal final class ExecutorFuture<Value>: Future<Value> {
@usableFromInline internal fileprivate(set) var executor: Executor? = nil
@usableFromInline convenience internal init(with value: Value? = nil, executor: Executor? = nil) {
self.init(with: value)
self.executor = executor
}
override func unsafeComplete(with value: Value) {
guard case .none = self.value else { return /* TODO: add log */ }
self.value = value
if let executor = executor {
executor.execute {
self.observers.forEach { $0(value) }
self.observers = .init()
}
} else {
observers.forEach { $0(value) }
observers = .init()
}
}
override func unsafeExecuteOrSchedule(_ action: @escaping (Value) -> Void) {
if let value = value {
if let executor = executor {
executor.execute {
action(value)
}
} else {
action(value)
}
} else {
observers.append(action)
}
}
}
// MARK: Executor switch
public extension Future {
@inlinable
func execute(using executor: Executor) -> Future {
lock.lock()
defer { lock.unlock() }
let nextFuture: ExecutorFuture<Value> = .init(executor: executor)
unsafeExecuteOrSchedule(nextFuture.complete(with:))
return nextFuture
}
}
public class Future<Value> {
@usableFromInline internal var value: Value? = nil
@usableFromInline internal let lock: Lock = .init()
@usableFromInline internal var observers: Array<(Value) -> Void> = .init()
@usableFromInline internal init(with value: Value? = nil) {
self.value = value
self.observers.reserveCapacity(1)
}
@inlinable
@inline(__always)
func unsafeComplete(with value: Value) {
guard case .none = self.value else { return /* TODO: add log */ }
self.value = value
observers.forEach { $0(value) }
observers = .init()
}
@inlinable
@inline(__always)
func unsafeExecuteOrSchedule(_ action: @escaping (Value) -> Void) {
if let value = value {
action(value)
} else {
observers.append(action)
}
}
}
// MARK: Initialization
public extension Future {
convenience init(completedWith value: Value) { self.init(with: value) }
convenience init
<Success, Failure: Error>
(succeededWith value: Success)
where Value == Result<Success, Failure> {
self.init(completedWith: .success(value))
}
convenience init
<Success, Failure: Error>
(failedWith error: Failure)
where Value == Result<Success, Failure> {
self.init(with: .failure(error))
}
static func never() -> Future { return NeverFuture() }
}
// MARK: Completion
internal extension Future {
@usableFromInline
func complete(with value: Value) {
lock.lock()
defer { lock.unlock() }
unsafeComplete(with: value)
}
}
// MARK: Handlers
public extension Future {
@inlinable
func completion(_ action: @escaping (Value) -> Void) -> Void {
lock.lock()
defer { lock.unlock() }
unsafeExecuteOrSchedule(action)
}
}
public extension Future {
@inlinable
func success
<Success, Failure>
(_ handler: @escaping (Success) -> Void) -> Future<Failure>
where Failure: Error, Value == Result<Success, Failure> {
lock.lock()
defer { lock.unlock() }
let failureFuture: Future<Failure> = .init()
unsafeExecuteOrSchedule {
switch $0 {
case let .success(value):
handler(value)
case let .failure(reason):
failureFuture.complete(with: reason)
}
}
return failureFuture
}
@inlinable func failure
<Success, Failure>
(_ handler: @escaping (Failure) -> Void) -> Future<Success>
where Failure: Error, Value == Result<Success, Failure> {
lock.lock()
defer { lock.unlock() }
let successFuture: Future<Success> = .init()
unsafeExecuteOrSchedule {
switch $0 {
case let .success(value):
successFuture.complete(with: value)
case let .failure(reason):
handler(reason)
}
}
return successFuture
}
}
// MARK: Transformations
public extension Future {
@inlinable
func map<Mapped>(_ transformation: @escaping (Value) -> Mapped) -> Future<Mapped> {
lock.lock()
defer { lock.unlock() }
let mappedFuture: Future<Mapped> = .init()
unsafeExecuteOrSchedule {
mappedFuture.complete(with: transformation($0))
}
return mappedFuture
}
@inlinable
func flatMap<Mapped>(_ transformation: @escaping (Value) -> Future<Mapped>) -> Future<Mapped> {
lock.lock()
defer { lock.unlock() }
let mappedFuture: Future<Mapped> = .init()
unsafeExecuteOrSchedule {
transformation($0).completion(mappedFuture.complete(with:))
}
return mappedFuture
}
}
public extension Future {
@inlinable
func throwingMap<Mapped>(_ transformation: @escaping (Value) throws -> Mapped) -> Future<Result<Mapped, Error>> {
lock.lock()
defer { lock.unlock() }
let mappedFuture: Future<Result<Mapped, Error>> = .init()
unsafeExecuteOrSchedule { value in
mappedFuture.complete(with: Result { try transformation(value) })
}
return mappedFuture
}
@inlinable
func throwingResultMap<Mapped>(_ transformation: @escaping (Value) throws -> Result<Mapped, Error>) -> Future<Result<Mapped, Error>> {
lock.lock()
defer { lock.unlock() }
let mappedFuture: Future<Result<Mapped, Error>> = .init()
unsafeExecuteOrSchedule { value in
do {
try mappedFuture.complete(with: transformation(value))
} catch {
mappedFuture.complete(with: .failure(error))
}
}
return mappedFuture
}
@inlinable func mapSuccess
<Success, Failure, Mapped>
(_ transformation: @escaping (Success) -> Mapped) -> Future<Result<Mapped, Failure>>
where Failure: Error, Value == Result<Success, Failure>
{
lock.lock()
defer { lock.unlock() }
let mappedFuture: Future<Result<Mapped, Failure>> = .init()
unsafeExecuteOrSchedule {
mappedFuture.complete(with: $0.map(transformation))
}
return mappedFuture
}
@inlinable func resultMapSuccess
<Success, Failure, Mapped>
(_ transformation: @escaping (Success) -> Result<Mapped, Failure>) -> Future<Result<Mapped, Failure>>
where Failure: Error, Value == Result<Success, Failure>
{
lock.lock()
defer { lock.unlock() }
let mappedFuture: Future<Result<Mapped, Failure>> = .init()
unsafeExecuteOrSchedule {
mappedFuture.complete(with: $0.flatMap(transformation))
}
return mappedFuture
}
@inlinable func mapFailure
<Success, Failure, Mapped>
(_ transformation: @escaping (Failure) -> Mapped) -> Future<Result<Success, Mapped>>
where Failure: Error, Mapped: Error, Value == Result<Success, Failure>
{
lock.lock()
defer { lock.unlock() }
let mappedFuture: Future<Result<Success, Mapped>> = .init()
unsafeExecuteOrSchedule {
mappedFuture.complete(with: $0.mapError(transformation))
}
return mappedFuture
}
@inlinable func resultMapFailure
<Success, Failure, Mapped>
(_ transformation: @escaping (Failure) -> Result<Success, Mapped>) -> Future<Result<Success, Mapped>>
where Failure: Error, Mapped: Error, Value == Result<Success, Failure>
{
lock.lock()
defer { lock.unlock() }
let mappedFuture: Future<Result<Success, Mapped>> = .init()
unsafeExecuteOrSchedule {
mappedFuture.complete(with: $0.flatMapError(transformation))
}
return mappedFuture
}
}
// MARK: zip
@inlinable
public func zip<T1, T2>(_ f1: Future<T1>, _ f2: Future<T2>) -> Future<(T1, T2)> {
var result: (T1?, T2?) = (nil, nil)
let resultFuture: Future<(T1, T2)> = .init()
let lock: Lock = .init()
f1.completion { val1 in
lock.lock()
if case let (nil, val2?) = result {
resultFuture.complete(with: (val1, val2))
} else {
result = (val1, nil)
lock.unlock()
}
}
f2.completion { val2 in
lock.lock()
if case let (val1?, nil) = result {
resultFuture.complete(with: (val1, val2))
} else {
result = (nil, val2)
lock.unlock()
}
}
return resultFuture
}
#if os(macOS) || os(iOS) || os(tvOS) || os(watchOS)
import os
public final class Lock {
fileprivate var ptr: os_unfair_lock = .init()
@usableFromInline
internal init() {}
@usableFromInline
@inline(__always)
internal func lock() {
os_unfair_lock_lock(&ptr)
}
@usableFromInline
@inline(__always)
internal func tryLock() -> Bool {
os_unfair_lock_trylock(&ptr)
}
@usableFromInline
@inline(__always)
internal func unlock() {
os_unfair_lock_unlock(&ptr)
}
}
#else
import Glibc
public final class Lock {
private let ptr: UnsafeMutablePointer<pthread_mutex_t> = {
let pointer: UnsafeMutablePointer<pthread_mutex_t> = .allocate(capacity: 1)
let attr: UnsafeMutablePointer<pthread_mutexattr_t> = .allocate(capacity: 1)
guard pthread_mutexattr_init(attr) == 0 else { preconditionFailure() }
pthread_mutexattr_settype(attr, PTHREAD_MUTEX_NORMAL)
pthread_mutexattr_setpshared(attr, PTHREAD_PROCESS_PRIVATE)
guard pthread_mutex_init(pointer, attr) == 0 else { preconditionFailure() }
pthread_mutexattr_destroy(attr)
attr.deinitialize(count: 1)
attr.deallocate()
return pointer
}()
deinit {
pthread_mutex_destroy(ptr)
ptr.deinitialize(count: 1)
ptr.deallocate()
}
@inline(__always)
internal func lock() {
pthread_mutex_lock(ptr)
}
@inline(__always)
internal func tryLock() -> Bool {
pthread_mutex_trylock(ptr) == 0
}
@inline(__always)
internal func unlock() {
pthread_mutex_unlock(ptr)
}
}
#endif
@usableFromInline
internal final class NeverFuture<Value>: Future<Value> {
override func unsafeComplete(with value: Value) {
// ignore
}
override func unsafeExecuteOrSchedule(_ action: @escaping (Value) -> Void) {
// ignore
}
}
public final class Promise<Value> {
public let future: Future<Value> = .init()
public init() {}
public func fulfill(with value: Value) {
future.complete(with: value)
}
}
public extension Promise {
func succeed<Success, Failure: Error>(with value: Success) where Value == Result<Success, Failure> {
future.complete(with: .success(value))
}
func fail<Success, Failure: Error>(with error: Failure) where Value == Result<Success, Failure> {
future.complete(with: .failure(error))
}
}
@kaqu
Copy link
Author

kaqu commented Sep 28, 2019

example (possible function renames):

        let cancelation: Cancelation = .init()
        promise // Promise<Int>
            .future
            // converts type from Int to Result<Int, Error> since error can be thrown
            .throwingMap { (val: Int) -> Int in
                if val < 0 {
                    throw NSError(domain: "Error", code: 42, userInfo: nil)
                } else {
                    return val
                }
            }
             // chain can be canceled from this point using cancelation.cancel(), does not affect previous actions
            .cancelable(using: cancelation)
            // from this point guarantees that everything below is executed on that queue
            .execute(using: DispatchQueue.global())
            // converts type from Result<Int, Error> to Int since error is handled
            .failure { (e: Error) in
                print(e)
            }
            // returns void completing operations chain
            .completion { (result) in
                XCTAssertEqual(result, 42)
                dispatchPrecondition(condition: .notOnQueue(.main))
            }
        promise.fulfill(with: 42)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment