Skip to content

Instantly share code, notes, and snippets.

@jjttjj
Last active Mar 8, 2021
Embed
What would you like to do?
(defn -add-clause [accessor->clauses cl]
(let [[ax & clause] cl]
(update accessor->clauses ax (fnil conj #{}) clause)))
(defn get-matches [accessor->clauses input]
(set
(for [[ax clauses] accessor->clauses
[op & args :as clause] clauses
:let [match-val (cond-> [ax op] args (into args))
ax (if (symbol? ax) (resolve ax) ax)
v (ax input)]
:when v
:let [op (if (symbol? op) (resolve op) op)]
:when (apply op v args)]
match-val)))
(defn qpub
[ch]
(let [state (atom {})
pub (a/pub ch (fn [x] (get-matches @state x))) ]
{::state state
::pub pub}))
(defn qsub
[{::keys [state pub]} clauses ch]
(doseq [[ax cond :as clause] clauses]
(swap! state -add-clause clause))
(a/sub pub (set clauses) ch)) ;;not actually correct, needs to match subsets of all matches
(def in (chan))
(def qp (qpub in))
(def sub1 (qsub qp '[[:venue #{:coinbase :gemini}]
[:symbol #{:btc}]
[:type #{:open :done}]
[:price > 50000]
[:price < 75000]]
(chan 100)))
(def msgs
[{:venue :coinbase
:symbol :btc
:price 91000
:type :open}
{:venue :blah
:symbol :btc
:price 20000
:type :open}
{:venue :gemini
:symbol :btc
:price 52000
:type :open}
{:venue :coinbase
:symbol :btc
:price 55000
:type :done}
{:venue :coinbase
:symbol :eth
:price 55000
:type :done}])
(a/onto-chan!! in msgs)
(<!! (a/into [] sub1))
;;=>
[{:venue :gemini, :symbol :btc-usd, :price 52000, :type :open}
{:venue :coinbase, :symbol :btc-usd, :price 55000, :type :done}]
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment