Skip to content

Instantly share code, notes, and snippets.

@hiredman
Created October 18, 2022 20:03
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save hiredman/19e01f112b52e3a967af597262de3a81 to your computer and use it in GitHub Desktop.
Save hiredman/19e01f112b52e3a967af597262de3a81 to your computer and use it in GitHub Desktop.
(import '(java.util ArrayList
List)
'(java.util.concurrent.locks ReentrantLock)
'(java.util.concurrent.atomic AtomicReference
AtomicReferenceArray))
(set! *warn-on-reflection* true)
(set! *unchecked-math* :warn-on-boxed)
;; TODO: should you be able to sync multiple times on the same event
;; object, seems like no?.
(defprotocol Event
(try-event [event resume control nack-group]
"try to sync this event")
;; TODO split this out
(check-nack-group [event nack-group]))
(defprotocol Syncable
(-sync [event]))
(defprotocol QuasiEvent
(push-down [event ctor lst]))
(extend-type nil
Event
(try-event [event resume control nack-group])
(check-nack-group [event nack-group])
QuasiEvent
(push-down [event ctor lst]
(when-let [evt (ctor event)]
(.add ^java.util.List lst evt)))
Syncable
(-sync [event]))
(extend-type Object
QuasiEvent
(push-down [event ctor lst]
(.add ^List lst (ctor event)))
Syncable
(-sync [event]
;; wrap list access in a volatile to ensure ordering, may not be
;; required.
(let [lst (volatile! (ArrayList.))
control (atom :waiting)]
(push-down event identity @lst)
(vreset! lst @lst)
(letfn [(resume-f [event value nack-group]
;; resume-f could end up invoked on another thread,
;; hence the volatile
(doseq [e @lst]
(check-nack-group e nack-group)))]
(doseq [evt @lst
:when evt]
(try-event evt resume-f control #{}))))))
(def always
(reify
Event
(try-event [event resume control nack-group]
(let [k (Object.)
f (fn [& _]
(when (= @control :synced)
(remove-watch control k))
(when (compare-and-set! control :waiting :synced)
(resume event true nack-group)))]
(add-watch control k f)
(f)))
(check-nack-group [event nack-group])))
(extend-type java.util.concurrent.CompletionStage
Event
(try-event [^java.util.concurrent.CompletionStage event resume control nack-group]
(.handle event
(reify
java.util.function.BiFunction
(apply [_ result exception]
(let [k (Object.)
f (fn [& _]
(when (= @control :synced)
(remove-watch control k))
(when (compare-and-set! control :waiting :synced)
(resume event (or exception result) nack-group)))]
(add-watch control k f)
(f))))))
(check-nack-group [event nack-group]))
(declare middleware)
;; Middleware wraps an event, passing protocols callls down with the
;; opportunity to intercept and make changes. The middleware smart
;; constructor can collapse multiple middlewares into a single
;; middleware via function composaition.
(defrecord Middleware [try-event-transform
check-nack-group-transform
push-down-transform
target]
Event
(try-event [_ resume control nack-group]
((try-event-transform try-event) target resume control nack-group))
(check-nack-group [_ nack-group]
((check-nack-group-transform check-nack-group) target nack-group))
QuasiEvent
(push-down [event ctor lst]
((push-down-transform push-down)
target
(comp ctor #(middleware try-event-transform
check-nack-group-transform
push-down-transform
%))
lst)))
(defn middleware
[try-event-transform check-nack-group-transform push-down-transform target]
(if (instance? Middleware target)
(let [^Middleware target target]
(middleware (comp try-event-transform (.-try-event-transform target))
(comp check-nack-group-transform (.-check-nack-group-transform target))
(comp push-down-transform (.-push-down-transform target))
(.-target target)))
(->Middleware try-event-transform check-nack-group-transform push-down-transform target)))
(defn barrier
"Given a function f, applies f to a function g and an event e. e becomes
enabled when g is invoked. returns the result of invoking f."
[f]
(let [root (AtomicReference.)
done (Object.)
event (middleware
(fn [event-transform]
(fn [event resume control nack-group]
(add-watch control (Object.) (fn [k r os ns]
(when (= ns :synced)
(loop [^AtomicReferenceArray r (.get root)]
(when r
(when-not (identical? r done)
(if (identical? control (.get r 1))
(doto r
(.set 0 nil)
(.set 1 nil)
(.set 2 nil))
(recur (.get r 3))))))
(remove-watch r k))))
(let [cell (doto (AtomicReferenceArray. 4)
(.set 0 resume)
(.set 1 control)
(.set 2 nack-group))]
(loop [r (.get root)]
(if (identical? r done)
(loop []
(if (compare-and-set! control :waiting :synced)
(resume event nil nack-group)
(when-not (= @control :synced)
(recur))))
(do
(.set cell 3 r)
(when-not (.compareAndSet root r cell)
(recur (.get root)))))))))
(fn [check-nack-group]
(fn [event nack-group]))
identity
nil)]
(f event
(fn []
(loop [^AtomicReferenceArray r (.get root)]
(when-not (identical? r done)
(if-not (.compareAndSet root r done)
(recur (.get root))
(loop [r r]
(when r
(let [resume (.get r 0)
control (.get r 1)
nack-group (.get r 2)]
(when (and resume control nack-group)
(loop []
(if (compare-and-set! control :waiting :synced)
(resume event nil nack-group)
(when-not (= @control :synced)
(recur)))))
(recur (.get r 3))))))))))))
(defn choose
"Takes a list of events, and non-deterministically selects one on synchronization"
[evts]
(middleware
identity
identity
(fn [push-down]
(fn [event ctor ^List lsg]
(doseq [e (shuffle evts)]
(push-down e ctor lsg))))
nil))
(defn nack
"Like guard, but f is passed an event G. G will be enabled if on
synchronization, the chosen event is not the one returned by f."
[f]
(middleware
identity
identity
(fn [push-down]
(fn [event ctor ^List lsg]
(barrier
(fn [b break]
(let [e (f b)
id (Object.)]
(push-down
e
(comp ctor
(fn g [evt]
(middleware
(fn [try-event]
(fn [event resume control nack-group]
(try-event event resume control (conj nack-group id))))
(fn [check-nack-group]
(fn [event nack-group]
(when-not (contains? nack-group id)
(break))
(check-nack-group event nack-group)))
identity
evt)))
lsg))))))
nil))
(defn guard
"Takes a function of no arguments f, returns an event E, before E is
synchronized on, f will be invoked, and E replaced with the event
returned by f."
[f]
(middleware
identity
identity
(fn [push-down]
(fn [event ctor ^List lst]
(push-down (f) ctor lst)))
nil))
(defn wrap
"Wraps the post synchronization action f around the event evt"
[evt f]
(middleware
(fn [try-event]
(fn [evt resume control nack-group]
(try-event evt
(fn [event value nack-group]
(resume event (f value) nack-group))
control
nack-group)))
identity
identity
evt))
(defonce scheduler
(delay
(java.util.concurrent.Executors/newScheduledThreadPool
1
(reify
java.util.concurrent.ThreadFactory
(newThread [_ r]
;; TODO name
(doto (Thread. r)
(.setDaemon true)))))))
(defn timeout [delay]
(let [now (System/nanoTime)
n (* 1000000 (long delay))
deadline (+ n now)]
(barrier
(fn [b break]
(nack
(fn [neg]
(let [now (System/nanoTime)
delay (- deadline now)]
(if (pos? delay)
(let [f (.schedule ^java.util.concurrent.ScheduledExecutorService @scheduler
^Runnable break
delay
java.util.concurrent.TimeUnit/NANOSECONDS)]
(-sync (wrap neg (fn [_] (future-cancel f))))
b)
always))))))))
(deftype Channel [fields])
(defn channel []
(->Channel
(doto (object-array 7)
(aset 0 (ReentrantLock.))
(aset 1 false)
(aset 2 (java.util.HashMap.))
(aset 3 nil) ; writers head
(aset 4 nil) ; writers tail
(aset 5 nil) ; readers head
(aset 6 nil) ; readers tail
)))
(defn close! [^Channel channel]
(let [^objects channel (.-fields channel)
^ReentrantLock lock (aget channel 0)
_ (.lock lock)
^java.util.HashMap controls (aget channel 2)]
(if-not (aget channel 1)
(do
(aset channel 1 true)
(let [items (into [] (vals controls))]
(.unlock lock)
(doseq [^objects item items
:let [r (aget item 0)
c (aget item 1)
n (aget item 2)
v (aget item 3)]
:when (compare-and-set! c :waiting :claimed)]
(compare-and-set! c :claimed :synced)
(r nil nil n))))
(.unlock lock))))
(defn exchange [^Channel channel writer-index reader-index value]
(let [^objects channel (.-fields channel)
writer-index (long writer-index)
reader-index (long reader-index)
^ReentrantLock lock (aget channel 0)
^java.util.HashMap controls (aget channel 2)]
(middleware
(fn [try-event]
(fn [event resume control nack-group]
;; TODO rename clean-up
(letfn [(clean-up [k r os ns]
(when (= ns :waiting)
(.lock lock)
(let [^objects item (aget channel reader-index)]
(if item
(let [r (aget item 0)
c (aget item 1)
n (aget item 2)
v (aget item 3)]
(if (identical? c control)
(do
(.unlock lock)
;; TODO figure out how this should really work
(throw (ex-info "Same operation on same channel?" {})))
(if (compare-and-set! c :waiting :claimed)
(if (compare-and-set! control :waiting :claimed)
(do
(.unlock lock)
(compare-and-set! c :claimed :synced)
(compare-and-set! control :claimed :synced)
(r event value n)
(resume event v nack-group))
(do
(compare-and-set! c :claimed :waiting)
(.unlock lock)))
(.unlock lock))))
(if (aget channel 1)
(do
(.unlock lock)
(when (compare-and-set! control :waiting :synced)
;; ?
(resume event nil nack-group)))
(.unlock lock)))))
(when (= ns :synced)
(.lock lock)
(loop [^objects item (.get controls control)]
(when item
(when (aget item 4)
(aset ^objects (aget item 4) 5 (aget item 5)))
(when (aget item 5)
(aset ^objects (aget item 5) 4 (aget item 4)))
(when (aget item 6)
(aset ^objects (aget item 6) 7 (aget item 7)))
(when (aget item 7)
(aset ^objects (aget item 7) 6 (aget item 6)))
(when (identical? (.get controls control) item)
(let [n (aget item 6)]
(if n
(.put controls control n)
(.remove controls control))))
(doseq [writer-index [writer-index reader-index]]
(when (identical? (aget channel writer-index) item)
(aset channel writer-index (aget item 4)))
(when (identical? (aget channel (inc (long writer-index))) item)
(aset channel writer-index (aget item 5)))
(when (nil? (aget channel writer-index))
(aset channel (inc (long writer-index)) nil)))
(recur (aget item 6))))
(.unlock lock)
(when k
(remove-watch r k))))]
(.lock lock)
(let [k (Object.)
entry (doto (object-array 8)
(aset 0 resume)
(aset 1 control)
(aset 2 nack-group)
(aset 3 value)
(aset 4 nil) ; next
(aset 5 (aget channel (inc writer-index))) ; prev
(aset 6 (.get controls control)) ; lst next
(aset 7 nil) ; lst prev
)]
(when (nil? (aget channel writer-index))
(aset channel writer-index entry))
(when (aget channel (inc writer-index))
(aset ^objects (aget channel (inc writer-index)) 4 entry))
(aset channel (inc writer-index) entry)
(when (some? (.get controls control))
(aset ^objects (.get controls control) 7 entry))
(.put controls control entry)
(add-watch control k clean-up)
(.unlock lock)
(clean-up k control :waiting @control)))))
identity
identity
nil)))
(defn tx [channel value]
(assert (some? value))
(exchange channel 3 5 value))
(defn rx [channel]
(exchange channel 5 3 true))
(defn sync!
([evt]
(-sync evt))
([evt callback]
(-sync (wrap evt callback))))
(defn sync!! [evt]
(let [p (promise)]
(sync! evt p)
@p))
(defn poll! [evt success failure]
(barrier
(fn [b break]
(sync!
(choose (wrap evt success)
(wrap b (fn [_] (failure)))))
(break))))
(defprotocol Consumer
(consume [_ ch])
(unconsume [_ ch])
(unconsume-all [_]))
(defprotocol Producer
(subscribe [_ value ch])
(unsubscribe [_ value ch])
(unsubscribe-all [_]))
(defmacro let-macro
([bindings body]
(let [names (map first bindings)]
`(let-macro ~(eval `(letfn ~bindings ~(into {} (map (fn [n] [(list 'quote n) n])) names)))
~identity
~body)))
([bindings k body]
(cond (and (seq? body)
(symbol? (first body))
(contains? bindings (first body)))
`(let-macro ~bindings ~k ~(apply (get bindings (first body)) (rest body)))
(and (seq? body)
(symbol? (first body))
(= "quote" (name (first body))))
body
(and (or (vector? body) (seq? body)) (seq body))
`(let-macro ~bindings
~(partial (fn f [i o v]
(if (seq i)
`(let-macro ~bindings
~(partial f (rest i) (conj o v))
~(first i))
(k ((if (vector? body) identity seq) (conj o v)))))
(rest body)
[])
~(first body))
(and (coll? body) (not (record? body)))
`(let-macro ~bindings ~(fn [v]
(k (into (empty body) v))) ~(seq body))
:else
(k body))))
(defn pubsub [selector]
(let [command (channel)]
(let-macro [(let% [bindings body]
(if (seq bindings)
(let [[p v & bindings] bindings]
`(~'bind% ~v (fn [~p] (~'let% ~bindings ~body))))
body))
(do% [a & things]
(if (seq things)
`(~'let% [_# ~a] (~'do% ~@things))
a))]
(letfn [(bind% [m b] (fn [k s] (m (fn [x s] ((b x) k s)) s)))
(return% [v] (fn [k s] (k v s)))
(call-cc% [f] (fn [k s] ((f (fn [value] (fn [_ s] (k value s)))) k s)))
(sync% [evt] (fn d [k s] (sync! (wrap evt (fn [v] (k v s))))))
(alter% [f & args] (fn [k s] (apply k (repeat 2 (apply f s args)))))
(send-loop []
(let% [state (alter% identity)
_ (reduce
(fn [x c]
(let% [^long i x
sent (sync% (tx c (:value-to-send state)))]
(if sent
(return% (inc i))
(alter% update-in [:outputs (:selection-value state)] disj c))))
(return% 0)
(:output-chans state))]
(do% (if (seq (get-in state [:outputs (:selection-value state)]))
(return% nil)
(alter% update-in [:outputs] dissoc (:selection-value state)))
(main-loop))))
(main-loop []
(let% [state (alter% assoc
:value-to-send nil
:selection-value nil
:output-chans nil
:new-output-chans nil)
[the-chan the-val] (sync%
(choose
(cons
(wrap (rx command) (partial vector command))
(for [i (:inputs state)]
(wrap (rx i) (partial vector i))))))]
(cond (= the-chan command)
(let [[op & args] the-val]
(case op
:consume (do% (alter% update-in [:inputs] conj (first args))
(main-loop))
:unconsume (do% (alter% update-in [:inputs] disj (first args))
(main-loop))
:unconsume-all (do% (alter% update-in [:inputs] (constantly #{}))
(main-loop))
:subscribe (do% (alter% update-in [:ouputs (first args)]
(fnil conj #{}) (second args))
(main-loop))
;; TODO dissoc empty
:unsubscribe (do% (alter% update-in [:ouputs (first args)] disj (second args))
(main-loop))
:unsubscribe-all (do% (alter% update-in [:ouputs] (constantly {}))
(main-loop))
:close-outputs (do% (return%
(doseq [[_ outputs] (get state :outputs)
output outputs]
(close! output)))
(alter% update-in [:ouputs] (constantly {}))
(main-loop))))
(some? the-val)
(let% [d (return% (selector the-val))
_ (alter% assoc
:value-to-send the-val
:selection-value d
:output-chans (get-in state [:ouputs d])
:new-output-chans #{})]
(send-loop))
:else
(do% (alter% update-in [:inputs] disj the-chan)
(main-loop)))))]
((main-loop)
(fn [v _] v)
{})))
(reify
Consumer
(consume [_ ch]
(tx command [:consume ch]))
(unconsume [_ ch]
(tx command [:unconsume ch]))
(unconsume-all [_]
(tx command [:unconsume-all]))
Producer
;; TODO on close?
(subscribe [_ value ch]
(tx command [:subscribe value ch]))
(unsubscribe [_ value ch]
(tx command [:unsubscribe value ch]))
(unsubscribe-all [_]
(tx command [:unsubscribe-all]))
java.io.Closeable
(close [_]
(tx command [:close-outputs])))))
(defn pipe [in out]
(let [p (pubsub (constantly nil))]
(consume p in)
(subscribe p nil out)))
(assert (= 1
(sync!!
(choose
[nil
(choose [(barrier (fn [e _] e))])
(barrier (fn [e _] e))
(nack (fn [s]
(sync! s (fn [_] (prn "not chosen")))
(barrier (fn [e _] e))))
(nack (fn [s]
(sync! s (fn [_] (prn "A")))
(choose
[(nack (fn [s]
(sync! s (fn [_] (prn "B")))
(barrier (fn [e _] e))))
(wrap (guard (fn [] (barrier (fn [e s] (s) e))))
(constantly 1))])))]))))
(println "here")
(let [c (channel)]
(sync! (tx c "Hello World") prn)
(sync! (rx c) (fn [v] (prn v))))
(let [to (timeout 1000)]
(time (sync!! to))
(time (sync!! to)))
(println "===============")
(let [p (pubsub (constantly nil))
c1 (channel)
c2 (channel)
c3 (channel)]
(sync! (consume p c1))
(sync! (subscribe p nil c2))
(sync! (subscribe p nil c3))
(sync!! (tx c1 "Hello World"))
(assert (= "Hello World" (sync!! (choose [(rx c3) (rx c2)]))))
(assert (= "Hello World" (sync!! (choose [(rx c3) (rx c2)]))))
)
(sync!! (wrap always (fn [_] (println "Here"))))
;; (defn proposer [input acceptors]
;; (letfn [(main-loop [events ballot]
;; (sync!
;; (wrap
;; (choose (vals events))
;; (fn [msg]
;; (case (:op msg)
;; :propose (let [events (dissoc events :input)]
;; (for [a acceptors]
;; [a (wrap (tx a ...)
;; (fn []))])))))))]
;; (main-loop
;; {:input (rx input)}
;; 0)))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment