Skip to content

Instantly share code, notes, and snippets.

@jjttjj
Created December 2, 2019 19:24
Show Gist options
  • Save jjttjj/f2f3dfed3dd5651863b0b12e44e7dd03 to your computer and use it in GitHub Desktop.
Save jjttjj/f2f3dfed3dd5651863b0b12e44e7dd03 to your computer and use it in GitHub Desktop.
(ns mult
(:require[clojure.core.async :as a]))
(defprotocol MultFn
:extend-via-metadata true
(add-tap [this-fn f] [this-fn xf f]
"Adds a function `f` to 'tapset', a set of functions which are
each called on every value passed to `this-fn`. If a transducer `xf`
is supplied, subjects all values to `xf` before calling `f` on the
result. When a value is reduced, the function is removed from the
tapset. Returns a reference to the added function (which may be
different than `f`) for later removal.")
(remove-tap [this-fn f]
"Removes `f` from tapset"))
(defn mult-impl [handlers]
{`add-tap (fn ([this f]
(swap! handlers conj f)
f)
([this xf f]
(let [xf' (xf (fn [_acc x] x))
f' (fn this-fn [x]
(let [result (xf' ::nothing x)]
(cond
(reduced? result)
(do (remove-tap this this-fn)
(f @result))
(= result ::nothing) nil
:else
(f result))))]
(swap! handlers conj f')
f')))
`remove-tap (fn [this f]
(swap! handlers disj f))})
"Constructs a tap-fn, which can have functions added and removed from it's 'tapset'. Every time the function is called with a value, that value is passed to all values in the tapset."
(defn go-mult
"Creates a multfn which returns immediately when called and calls all
taps in a `core.async/go` processs."
[]
(let [handlers (atom #{})
tapq (a/chan)]
(a/go-loop []
(when-let [x (a/<! tapq)]
(try (doseq [f @handlers]
(f (if (identical? ::tap-nil x) nil x)))
(catch #?(:clj Throwable :cljs js/Object) t
;;todo: error handling
;;(log/error t x)
(println t x)))
(recur)))
(with-meta
(fn [x]
(a/put! tapq (if (nil? x) ::tap-nil x))
nil)
(mult-impl handlers))))
#?(:clj
(defn thread-mult
"Creates a multfn which returns immediately when called and calls all
taps in a seperate thread processs."
[]
(let [handlers (atom #{})
tapq (a/chan)]
(a/thread
(loop []
(when-let [x (a/<!! tapq)]
(try (doseq [f @handlers]
(f (if (identical? ::tap-nil x) nil x)))
(catch #?(:clj Throwable :cljs js/Object) t
;;todo: error handling
;;(log/error t x)
(println t x)))
(recur))))
(with-meta
(fn [x]
(a/put! tapq (if (nil? x) ::tap-nil x))
nil)
(mult-impl handlers)))))
(defn sync-mult
"Creates a multfn which blocks when called until all taps are complete"
[]
(let [handlers (atom #{})]
(with-meta
(fn [x]
(try (doseq [f @handlers]
(f x))
(catch #?(:clj Throwable :cljs js/Object) t
;;todo: error handling
;;(log/error t x)
(println t x)))
nil)
(mult-impl handlers))))
;;Usage
(def m1 (go-mult))
(add-tap m1
#(println "received value in m1:" %))
(add-tap m1 (filter even?)
#(println "received even value in m1:" %))
(add-tap m1 (comp (filter odd?) (partition-all 2))
#(println "received two odd values:" %))
(def m2 (sync-mult))
(add-tap m1 (take 5) m2)
(add-tap m2 (map #(* 100 %)) #(println "m2 received" %))
(doseq [x (range 10)]
(m1 x))
;;prints:
;; received value in m1: 0
;; received even value in m1: 0
;; received value in m1: 1
;; received value in m1: 2
;; received even value in m1: 2
;; received value in m1: 3
;; received two odd values: [1 3]
;; received value in m1: 4
;; received even value in m1: 4
;; received value in m1: 5
;; received value in m1: 6
;; received even value in m1: 6
;; received value in m1: 7
;; received two odd values: [5 7]
;; received value in m1: 8
;; received even value in m1: 8
;; received value in m1: 9
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment