data.avl-based join transducer as discussed in the hallway track of Clojure/conj 2018
(alembic/distill '[org.clojure/data.avl "0.0.17"]) | |
(alembic/distill '[net.cgrand/xforms "0.12.1"]) | |
(require '[clojure.data.avl :as avl] | |
'[net.cgrand.xforms :as x]) | |
(defn join [keyfn xforms-map] | |
(comp | |
(x/multiplex xforms-map) | |
(fn join-xform [rf] | |
(let [xform-keys (vec (keys xforms-map)) | |
ms (volatile! (zipmap xform-keys (repeatedly #(avl/sorted-map))))] | |
(fn join-rf | |
([] (rf)) | |
([end] (rf end)) | |
([acc [xform-key x]] | |
(let [k (keyfn x) | |
ms' (dissoc @ms xform-key) | |
maybe-output | |
(reduce-kv (fn join-rf-collect [out xform-key' m'] | |
(let [e (find m' k)] | |
(cond-> out | |
e (assoc xform-key' (val e))))) | |
{xform-key x} | |
ms')] | |
(if (== (count xforms-map) (count maybe-output)) | |
(do | |
(doseq [xform-key xform-keys] | |
(vswap! ms update xform-key avl/subrange > k)) | |
(rf acc maybe-output)) | |
(do | |
(vswap! ms update xform-key assoc k x) | |
acc))))))))) | |
(comment | |
(transduce | |
(join :foo {:x (map #(update % :foo * 2)) | |
:y (map #(update % :foo * 3)) | |
:z (map #(update % :foo * 4))}) | |
conj | |
[] | |
(map #(hash-map :foo %) (range 30))) | |
#_[{:z {:foo 0}, :x {:foo 0}, :y {:foo 0}} | |
{:x {:foo 12}, :y {:foo 12}, :z {:foo 12}} | |
{:x {:foo 24}, :y {:foo 24}, :z {:foo 24}} | |
{:x {:foo 36}, :y {:foo 36}, :z {:foo 36}} | |
{:x {:foo 48}, :y {:foo 48}, :z {:foo 48}}] | |
(sequence | |
(join :foo {:x (map #(-> (update % :foo * 2) (assoc :x true))) | |
:y (map #(-> (update % :foo * 3) (assoc :y true))) | |
:z (map #(-> (update % :foo * 4) (assoc :z true)))}) | |
(map #(hash-map :foo %) (range 30))) | |
#_({:z {:z true, :foo 0}, :x {:foo 0, :x true}, :y {:y true, :foo 0}} | |
{:x {:foo 12, :x true}, :y {:y true, :foo 12}, :z {:z true, :foo 12}} | |
{:x {:foo 24, :x true}, :y {:y true, :foo 24}, :z {:z true, :foo 24}} | |
{:x {:foo 36, :x true}, :y {:y true, :foo 36}, :z {:z true, :foo 36}} | |
{:x {:foo 48, :x true}, :y {:y true, :foo 48}, :z {:z true, :foo 48}}) | |
) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment