Created
October 18, 2022 20:03
-
-
Save hiredman/19e01f112b52e3a967af597262de3a81 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
(import '(java.util ArrayList | |
List) | |
'(java.util.concurrent.locks ReentrantLock) | |
'(java.util.concurrent.atomic AtomicReference | |
AtomicReferenceArray)) | |
(set! *warn-on-reflection* true) | |
(set! *unchecked-math* :warn-on-boxed) | |
;; TODO: should you be able to sync multiple times on the same event | |
;; object, seems like no?. | |
(defprotocol Event | |
(try-event [event resume control nack-group] | |
"try to sync this event") | |
;; TODO split this out | |
(check-nack-group [event nack-group])) | |
(defprotocol Syncable | |
(-sync [event])) | |
(defprotocol QuasiEvent | |
(push-down [event ctor lst])) | |
(extend-type nil | |
Event | |
(try-event [event resume control nack-group]) | |
(check-nack-group [event nack-group]) | |
QuasiEvent | |
(push-down [event ctor lst] | |
(when-let [evt (ctor event)] | |
(.add ^java.util.List lst evt))) | |
Syncable | |
(-sync [event])) | |
(extend-type Object | |
QuasiEvent | |
(push-down [event ctor lst] | |
(.add ^List lst (ctor event))) | |
Syncable | |
(-sync [event] | |
;; wrap list access in a volatile to ensure ordering, may not be | |
;; required. | |
(let [lst (volatile! (ArrayList.)) | |
control (atom :waiting)] | |
(push-down event identity @lst) | |
(vreset! lst @lst) | |
(letfn [(resume-f [event value nack-group] | |
;; resume-f could end up invoked on another thread, | |
;; hence the volatile | |
(doseq [e @lst] | |
(check-nack-group e nack-group)))] | |
(doseq [evt @lst | |
:when evt] | |
(try-event evt resume-f control #{})))))) | |
(def always | |
(reify | |
Event | |
(try-event [event resume control nack-group] | |
(let [k (Object.) | |
f (fn [& _] | |
(when (= @control :synced) | |
(remove-watch control k)) | |
(when (compare-and-set! control :waiting :synced) | |
(resume event true nack-group)))] | |
(add-watch control k f) | |
(f))) | |
(check-nack-group [event nack-group]))) | |
(extend-type java.util.concurrent.CompletionStage | |
Event | |
(try-event [^java.util.concurrent.CompletionStage event resume control nack-group] | |
(.handle event | |
(reify | |
java.util.function.BiFunction | |
(apply [_ result exception] | |
(let [k (Object.) | |
f (fn [& _] | |
(when (= @control :synced) | |
(remove-watch control k)) | |
(when (compare-and-set! control :waiting :synced) | |
(resume event (or exception result) nack-group)))] | |
(add-watch control k f) | |
(f)))))) | |
(check-nack-group [event nack-group])) | |
(declare middleware) | |
;; Middleware wraps an event, passing protocols callls down with the | |
;; opportunity to intercept and make changes. The middleware smart | |
;; constructor can collapse multiple middlewares into a single | |
;; middleware via function composaition. | |
(defrecord Middleware [try-event-transform | |
check-nack-group-transform | |
push-down-transform | |
target] | |
Event | |
(try-event [_ resume control nack-group] | |
((try-event-transform try-event) target resume control nack-group)) | |
(check-nack-group [_ nack-group] | |
((check-nack-group-transform check-nack-group) target nack-group)) | |
QuasiEvent | |
(push-down [event ctor lst] | |
((push-down-transform push-down) | |
target | |
(comp ctor #(middleware try-event-transform | |
check-nack-group-transform | |
push-down-transform | |
%)) | |
lst))) | |
(defn middleware | |
[try-event-transform check-nack-group-transform push-down-transform target] | |
(if (instance? Middleware target) | |
(let [^Middleware target target] | |
(middleware (comp try-event-transform (.-try-event-transform target)) | |
(comp check-nack-group-transform (.-check-nack-group-transform target)) | |
(comp push-down-transform (.-push-down-transform target)) | |
(.-target target))) | |
(->Middleware try-event-transform check-nack-group-transform push-down-transform target))) | |
(defn barrier | |
"Given a function f, applies f to a function g and an event e. e becomes | |
enabled when g is invoked. returns the result of invoking f." | |
[f] | |
(let [root (AtomicReference.) | |
done (Object.) | |
event (middleware | |
(fn [event-transform] | |
(fn [event resume control nack-group] | |
(add-watch control (Object.) (fn [k r os ns] | |
(when (= ns :synced) | |
(loop [^AtomicReferenceArray r (.get root)] | |
(when r | |
(when-not (identical? r done) | |
(if (identical? control (.get r 1)) | |
(doto r | |
(.set 0 nil) | |
(.set 1 nil) | |
(.set 2 nil)) | |
(recur (.get r 3)))))) | |
(remove-watch r k)))) | |
(let [cell (doto (AtomicReferenceArray. 4) | |
(.set 0 resume) | |
(.set 1 control) | |
(.set 2 nack-group))] | |
(loop [r (.get root)] | |
(if (identical? r done) | |
(loop [] | |
(if (compare-and-set! control :waiting :synced) | |
(resume event nil nack-group) | |
(when-not (= @control :synced) | |
(recur)))) | |
(do | |
(.set cell 3 r) | |
(when-not (.compareAndSet root r cell) | |
(recur (.get root))))))))) | |
(fn [check-nack-group] | |
(fn [event nack-group])) | |
identity | |
nil)] | |
(f event | |
(fn [] | |
(loop [^AtomicReferenceArray r (.get root)] | |
(when-not (identical? r done) | |
(if-not (.compareAndSet root r done) | |
(recur (.get root)) | |
(loop [r r] | |
(when r | |
(let [resume (.get r 0) | |
control (.get r 1) | |
nack-group (.get r 2)] | |
(when (and resume control nack-group) | |
(loop [] | |
(if (compare-and-set! control :waiting :synced) | |
(resume event nil nack-group) | |
(when-not (= @control :synced) | |
(recur))))) | |
(recur (.get r 3)))))))))))) | |
(defn choose | |
"Takes a list of events, and non-deterministically selects one on synchronization" | |
[evts] | |
(middleware | |
identity | |
identity | |
(fn [push-down] | |
(fn [event ctor ^List lsg] | |
(doseq [e (shuffle evts)] | |
(push-down e ctor lsg)))) | |
nil)) | |
(defn nack | |
"Like guard, but f is passed an event G. G will be enabled if on | |
synchronization, the chosen event is not the one returned by f." | |
[f] | |
(middleware | |
identity | |
identity | |
(fn [push-down] | |
(fn [event ctor ^List lsg] | |
(barrier | |
(fn [b break] | |
(let [e (f b) | |
id (Object.)] | |
(push-down | |
e | |
(comp ctor | |
(fn g [evt] | |
(middleware | |
(fn [try-event] | |
(fn [event resume control nack-group] | |
(try-event event resume control (conj nack-group id)))) | |
(fn [check-nack-group] | |
(fn [event nack-group] | |
(when-not (contains? nack-group id) | |
(break)) | |
(check-nack-group event nack-group))) | |
identity | |
evt))) | |
lsg)))))) | |
nil)) | |
(defn guard | |
"Takes a function of no arguments f, returns an event E, before E is | |
synchronized on, f will be invoked, and E replaced with the event | |
returned by f." | |
[f] | |
(middleware | |
identity | |
identity | |
(fn [push-down] | |
(fn [event ctor ^List lst] | |
(push-down (f) ctor lst))) | |
nil)) | |
(defn wrap | |
"Wraps the post synchronization action f around the event evt" | |
[evt f] | |
(middleware | |
(fn [try-event] | |
(fn [evt resume control nack-group] | |
(try-event evt | |
(fn [event value nack-group] | |
(resume event (f value) nack-group)) | |
control | |
nack-group))) | |
identity | |
identity | |
evt)) | |
(defonce scheduler | |
(delay | |
(java.util.concurrent.Executors/newScheduledThreadPool | |
1 | |
(reify | |
java.util.concurrent.ThreadFactory | |
(newThread [_ r] | |
;; TODO name | |
(doto (Thread. r) | |
(.setDaemon true))))))) | |
(defn timeout [delay] | |
(let [now (System/nanoTime) | |
n (* 1000000 (long delay)) | |
deadline (+ n now)] | |
(barrier | |
(fn [b break] | |
(nack | |
(fn [neg] | |
(let [now (System/nanoTime) | |
delay (- deadline now)] | |
(if (pos? delay) | |
(let [f (.schedule ^java.util.concurrent.ScheduledExecutorService @scheduler | |
^Runnable break | |
delay | |
java.util.concurrent.TimeUnit/NANOSECONDS)] | |
(-sync (wrap neg (fn [_] (future-cancel f)))) | |
b) | |
always)))))))) | |
(deftype Channel [fields]) | |
(defn channel [] | |
(->Channel | |
(doto (object-array 7) | |
(aset 0 (ReentrantLock.)) | |
(aset 1 false) | |
(aset 2 (java.util.HashMap.)) | |
(aset 3 nil) ; writers head | |
(aset 4 nil) ; writers tail | |
(aset 5 nil) ; readers head | |
(aset 6 nil) ; readers tail | |
))) | |
(defn close! [^Channel channel] | |
(let [^objects channel (.-fields channel) | |
^ReentrantLock lock (aget channel 0) | |
_ (.lock lock) | |
^java.util.HashMap controls (aget channel 2)] | |
(if-not (aget channel 1) | |
(do | |
(aset channel 1 true) | |
(let [items (into [] (vals controls))] | |
(.unlock lock) | |
(doseq [^objects item items | |
:let [r (aget item 0) | |
c (aget item 1) | |
n (aget item 2) | |
v (aget item 3)] | |
:when (compare-and-set! c :waiting :claimed)] | |
(compare-and-set! c :claimed :synced) | |
(r nil nil n)))) | |
(.unlock lock)))) | |
(defn exchange [^Channel channel writer-index reader-index value] | |
(let [^objects channel (.-fields channel) | |
writer-index (long writer-index) | |
reader-index (long reader-index) | |
^ReentrantLock lock (aget channel 0) | |
^java.util.HashMap controls (aget channel 2)] | |
(middleware | |
(fn [try-event] | |
(fn [event resume control nack-group] | |
;; TODO rename clean-up | |
(letfn [(clean-up [k r os ns] | |
(when (= ns :waiting) | |
(.lock lock) | |
(let [^objects item (aget channel reader-index)] | |
(if item | |
(let [r (aget item 0) | |
c (aget item 1) | |
n (aget item 2) | |
v (aget item 3)] | |
(if (identical? c control) | |
(do | |
(.unlock lock) | |
;; TODO figure out how this should really work | |
(throw (ex-info "Same operation on same channel?" {}))) | |
(if (compare-and-set! c :waiting :claimed) | |
(if (compare-and-set! control :waiting :claimed) | |
(do | |
(.unlock lock) | |
(compare-and-set! c :claimed :synced) | |
(compare-and-set! control :claimed :synced) | |
(r event value n) | |
(resume event v nack-group)) | |
(do | |
(compare-and-set! c :claimed :waiting) | |
(.unlock lock))) | |
(.unlock lock)))) | |
(if (aget channel 1) | |
(do | |
(.unlock lock) | |
(when (compare-and-set! control :waiting :synced) | |
;; ? | |
(resume event nil nack-group))) | |
(.unlock lock))))) | |
(when (= ns :synced) | |
(.lock lock) | |
(loop [^objects item (.get controls control)] | |
(when item | |
(when (aget item 4) | |
(aset ^objects (aget item 4) 5 (aget item 5))) | |
(when (aget item 5) | |
(aset ^objects (aget item 5) 4 (aget item 4))) | |
(when (aget item 6) | |
(aset ^objects (aget item 6) 7 (aget item 7))) | |
(when (aget item 7) | |
(aset ^objects (aget item 7) 6 (aget item 6))) | |
(when (identical? (.get controls control) item) | |
(let [n (aget item 6)] | |
(if n | |
(.put controls control n) | |
(.remove controls control)))) | |
(doseq [writer-index [writer-index reader-index]] | |
(when (identical? (aget channel writer-index) item) | |
(aset channel writer-index (aget item 4))) | |
(when (identical? (aget channel (inc (long writer-index))) item) | |
(aset channel writer-index (aget item 5))) | |
(when (nil? (aget channel writer-index)) | |
(aset channel (inc (long writer-index)) nil))) | |
(recur (aget item 6)))) | |
(.unlock lock) | |
(when k | |
(remove-watch r k))))] | |
(.lock lock) | |
(let [k (Object.) | |
entry (doto (object-array 8) | |
(aset 0 resume) | |
(aset 1 control) | |
(aset 2 nack-group) | |
(aset 3 value) | |
(aset 4 nil) ; next | |
(aset 5 (aget channel (inc writer-index))) ; prev | |
(aset 6 (.get controls control)) ; lst next | |
(aset 7 nil) ; lst prev | |
)] | |
(when (nil? (aget channel writer-index)) | |
(aset channel writer-index entry)) | |
(when (aget channel (inc writer-index)) | |
(aset ^objects (aget channel (inc writer-index)) 4 entry)) | |
(aset channel (inc writer-index) entry) | |
(when (some? (.get controls control)) | |
(aset ^objects (.get controls control) 7 entry)) | |
(.put controls control entry) | |
(add-watch control k clean-up) | |
(.unlock lock) | |
(clean-up k control :waiting @control))))) | |
identity | |
identity | |
nil))) | |
(defn tx [channel value] | |
(assert (some? value)) | |
(exchange channel 3 5 value)) | |
(defn rx [channel] | |
(exchange channel 5 3 true)) | |
(defn sync! | |
([evt] | |
(-sync evt)) | |
([evt callback] | |
(-sync (wrap evt callback)))) | |
(defn sync!! [evt] | |
(let [p (promise)] | |
(sync! evt p) | |
@p)) | |
(defn poll! [evt success failure] | |
(barrier | |
(fn [b break] | |
(sync! | |
(choose (wrap evt success) | |
(wrap b (fn [_] (failure))))) | |
(break)))) | |
(defprotocol Consumer | |
(consume [_ ch]) | |
(unconsume [_ ch]) | |
(unconsume-all [_])) | |
(defprotocol Producer | |
(subscribe [_ value ch]) | |
(unsubscribe [_ value ch]) | |
(unsubscribe-all [_])) | |
(defmacro let-macro | |
([bindings body] | |
(let [names (map first bindings)] | |
`(let-macro ~(eval `(letfn ~bindings ~(into {} (map (fn [n] [(list 'quote n) n])) names))) | |
~identity | |
~body))) | |
([bindings k body] | |
(cond (and (seq? body) | |
(symbol? (first body)) | |
(contains? bindings (first body))) | |
`(let-macro ~bindings ~k ~(apply (get bindings (first body)) (rest body))) | |
(and (seq? body) | |
(symbol? (first body)) | |
(= "quote" (name (first body)))) | |
body | |
(and (or (vector? body) (seq? body)) (seq body)) | |
`(let-macro ~bindings | |
~(partial (fn f [i o v] | |
(if (seq i) | |
`(let-macro ~bindings | |
~(partial f (rest i) (conj o v)) | |
~(first i)) | |
(k ((if (vector? body) identity seq) (conj o v))))) | |
(rest body) | |
[]) | |
~(first body)) | |
(and (coll? body) (not (record? body))) | |
`(let-macro ~bindings ~(fn [v] | |
(k (into (empty body) v))) ~(seq body)) | |
:else | |
(k body)))) | |
(defn pubsub [selector] | |
(let [command (channel)] | |
(let-macro [(let% [bindings body] | |
(if (seq bindings) | |
(let [[p v & bindings] bindings] | |
`(~'bind% ~v (fn [~p] (~'let% ~bindings ~body)))) | |
body)) | |
(do% [a & things] | |
(if (seq things) | |
`(~'let% [_# ~a] (~'do% ~@things)) | |
a))] | |
(letfn [(bind% [m b] (fn [k s] (m (fn [x s] ((b x) k s)) s))) | |
(return% [v] (fn [k s] (k v s))) | |
(call-cc% [f] (fn [k s] ((f (fn [value] (fn [_ s] (k value s)))) k s))) | |
(sync% [evt] (fn d [k s] (sync! (wrap evt (fn [v] (k v s)))))) | |
(alter% [f & args] (fn [k s] (apply k (repeat 2 (apply f s args))))) | |
(send-loop [] | |
(let% [state (alter% identity) | |
_ (reduce | |
(fn [x c] | |
(let% [^long i x | |
sent (sync% (tx c (:value-to-send state)))] | |
(if sent | |
(return% (inc i)) | |
(alter% update-in [:outputs (:selection-value state)] disj c)))) | |
(return% 0) | |
(:output-chans state))] | |
(do% (if (seq (get-in state [:outputs (:selection-value state)])) | |
(return% nil) | |
(alter% update-in [:outputs] dissoc (:selection-value state))) | |
(main-loop)))) | |
(main-loop [] | |
(let% [state (alter% assoc | |
:value-to-send nil | |
:selection-value nil | |
:output-chans nil | |
:new-output-chans nil) | |
[the-chan the-val] (sync% | |
(choose | |
(cons | |
(wrap (rx command) (partial vector command)) | |
(for [i (:inputs state)] | |
(wrap (rx i) (partial vector i))))))] | |
(cond (= the-chan command) | |
(let [[op & args] the-val] | |
(case op | |
:consume (do% (alter% update-in [:inputs] conj (first args)) | |
(main-loop)) | |
:unconsume (do% (alter% update-in [:inputs] disj (first args)) | |
(main-loop)) | |
:unconsume-all (do% (alter% update-in [:inputs] (constantly #{})) | |
(main-loop)) | |
:subscribe (do% (alter% update-in [:ouputs (first args)] | |
(fnil conj #{}) (second args)) | |
(main-loop)) | |
;; TODO dissoc empty | |
:unsubscribe (do% (alter% update-in [:ouputs (first args)] disj (second args)) | |
(main-loop)) | |
:unsubscribe-all (do% (alter% update-in [:ouputs] (constantly {})) | |
(main-loop)) | |
:close-outputs (do% (return% | |
(doseq [[_ outputs] (get state :outputs) | |
output outputs] | |
(close! output))) | |
(alter% update-in [:ouputs] (constantly {})) | |
(main-loop)))) | |
(some? the-val) | |
(let% [d (return% (selector the-val)) | |
_ (alter% assoc | |
:value-to-send the-val | |
:selection-value d | |
:output-chans (get-in state [:ouputs d]) | |
:new-output-chans #{})] | |
(send-loop)) | |
:else | |
(do% (alter% update-in [:inputs] disj the-chan) | |
(main-loop)))))] | |
((main-loop) | |
(fn [v _] v) | |
{}))) | |
(reify | |
Consumer | |
(consume [_ ch] | |
(tx command [:consume ch])) | |
(unconsume [_ ch] | |
(tx command [:unconsume ch])) | |
(unconsume-all [_] | |
(tx command [:unconsume-all])) | |
Producer | |
;; TODO on close? | |
(subscribe [_ value ch] | |
(tx command [:subscribe value ch])) | |
(unsubscribe [_ value ch] | |
(tx command [:unsubscribe value ch])) | |
(unsubscribe-all [_] | |
(tx command [:unsubscribe-all])) | |
java.io.Closeable | |
(close [_] | |
(tx command [:close-outputs]))))) | |
(defn pipe [in out] | |
(let [p (pubsub (constantly nil))] | |
(consume p in) | |
(subscribe p nil out))) | |
(assert (= 1 | |
(sync!! | |
(choose | |
[nil | |
(choose [(barrier (fn [e _] e))]) | |
(barrier (fn [e _] e)) | |
(nack (fn [s] | |
(sync! s (fn [_] (prn "not chosen"))) | |
(barrier (fn [e _] e)))) | |
(nack (fn [s] | |
(sync! s (fn [_] (prn "A"))) | |
(choose | |
[(nack (fn [s] | |
(sync! s (fn [_] (prn "B"))) | |
(barrier (fn [e _] e)))) | |
(wrap (guard (fn [] (barrier (fn [e s] (s) e)))) | |
(constantly 1))])))])))) | |
(println "here") | |
(let [c (channel)] | |
(sync! (tx c "Hello World") prn) | |
(sync! (rx c) (fn [v] (prn v)))) | |
(let [to (timeout 1000)] | |
(time (sync!! to)) | |
(time (sync!! to))) | |
(println "===============") | |
(let [p (pubsub (constantly nil)) | |
c1 (channel) | |
c2 (channel) | |
c3 (channel)] | |
(sync! (consume p c1)) | |
(sync! (subscribe p nil c2)) | |
(sync! (subscribe p nil c3)) | |
(sync!! (tx c1 "Hello World")) | |
(assert (= "Hello World" (sync!! (choose [(rx c3) (rx c2)])))) | |
(assert (= "Hello World" (sync!! (choose [(rx c3) (rx c2)])))) | |
) | |
(sync!! (wrap always (fn [_] (println "Here")))) | |
;; (defn proposer [input acceptors] | |
;; (letfn [(main-loop [events ballot] | |
;; (sync! | |
;; (wrap | |
;; (choose (vals events)) | |
;; (fn [msg] | |
;; (case (:op msg) | |
;; :propose (let [events (dissoc events :input)] | |
;; (for [a acceptors] | |
;; [a (wrap (tx a ...) | |
;; (fn []))])))))))] | |
;; (main-loop | |
;; {:input (rx input)} | |
;; 0))) | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment