Skip to content

Instantly share code, notes, and snippets.

@jjttjj
Created August 23, 2019 17:53
Show Gist options
  • Save jjttjj/049f600e033cae9057dfe32fcb1e9bc0 to your computer and use it in GitHub Desktop.
Save jjttjj/049f600e033cae9057dfe32fcb1e9bc0 to your computer and use it in GitHub Desktop.
(defn mkput [a p rf cbs]
(fn [x]
(if (realized? p)
false
(let [result (rf a x)]
(if (reduced? result)
(let [x @@result]
;;run callbacks then deliver
(doseq [cb @cbs] (cb x))
(deliver p x))
result)
true))))
(defn acc
"Accumulates state in an atom subject to a transducer
Returns a map with the keys :put!, :register-cb, :a and :p.
:put! is a function which adds its single argument to atom with rf
subject to xf.
:p is a promise which will be delivered the state in :a when rf
results in a 'reduced'. Before p is realized, all callbacks will be
called with the final value in :a"
([xf rf] (acc xf rf (rf)))
([xf rf init]
(let [a (atom init)
swapper (fn [acc x] (swap! acc rf x) acc)
rf (xf swapper)
p (promise)
cbs (atom [])]
{:a a
:p p
:put! (mkput a p rf cbs)
:register-cb (fn [f]
(assert (not (realized? p)) "Promise is already realized")
(swap! cbs conj f))})))
(defn put!
"put! a value into an `acc` subject to its transduer. Returns false if
the acc is already complete and not accepting new values, otherwise
returns true"
[this x]
((:put! this) x))
(defn on-realized [this f]
((:register-cb this) f))
(defrecord AccProm [a p register-cb put!]
clojure.lang.IDeref
(deref [this]
(deref p))
clojure.lang.IBlockingDeref
(deref [this timeout-ms timeout-val]
(deref p timeout-ms timeout-val)))
(defn acc-prom
([xf rf] (acc-prom xf rf (rf)))
([xf rf init]
(map->AccProm (acc xf rf init))))
(defrecord AccAtom [a p register-cb put!]
clojure.lang.IDeref
(deref [this]
(deref a)))
(defn acc-atom
([xf rf] (acc-prom xf rf (rf)))
([xf rf init]
(map->AccAtom (acc xf rf init))))
@jjttjj
Copy link
Author

jjttjj commented Aug 23, 2019

Example usage:

(def p (acc-prom (comp (map inc) (filter odd?) (take 3)) conj []))

(future @p (println "p is realized"))

(put! p 2)
(put! p 3)
(put! p 4)
(put! p 5)
(put! p 6)

;;p is realized

(def a (acc-atom (comp (map inc) (filter odd?) (take 3)) conj []))

(put! a 42)
(put! a 100)
(put! a 111)

@a
;;=> [43 101]

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment