Skip to content

Instantly share code, notes, and snippets.



Created Oct 14, 2017
What would you like to do?
Join transducer
(defn join
"Joins pairs returned by nested transducers based on their key (first item).
Each nested transducer must returns pairs made of a strictly increasing key and a value.
Emits pairs made of the join key and a vector of the joined values (nil when none).
Values in the vector appears in the same order as the xform that produced them."
([] identity)
([xform] xform)
([xform1 xform2]
(fn [rf]
(let [vq1 (volatile! clojure.lang.PersistentQueue/EMPTY)
vq2 (volatile! clojure.lang.PersistentQueue/EMPTY)
log (fn [q x] (or (some-> q (conj x)) (reduced clojure.lang.PersistentQueue/EMPTY)))
rf1 (xform1 log)
rf2 (xform2 log)]
([] (rf))
([acc] ; todo flush queues
(rf acc))
([acc x]
(let [q1 (rf1 @vq1 x)
q2 (rf2 @vq2 x)]
; todo: think about reduced
(loop [acc acc
q1' (if (reduced? q1)
(if (seq @q1)
(into @q1 (x/for [[k _] %] [k nil]) (unreduced q2)))
q2' (if (reduced? q2)
(if (seq @q2)
(into @q2 (x/for [[k _] %] [k nil]) (unreduced q1)))
(if (reduced? acc)
(if-some [[k1 x1] (peek q1')]
(if-some [[k2 x2] (peek q2')]
(let [cmp (compare k1 k2)]
(neg? cmp) (recur (rf acc [k1 [x1 nil]]) (pop q1') q2')
(pos? cmp) (recur (rf acc [k2 [nil x2]]) q1' (pop q2'))
:else (recur (rf acc [k1 [x1 x2]]) (pop q1') (pop q2'))))
(vreset! vq1 q1')
(vreset! vq2 (when-not (reduced? q2) q2'))
(vreset! vq1 (when-not (reduced? q1) q1'))
(vreset! vq2 q2')
(and (reduced? q1) (reduced? q2)) ensure-reduced)))))))
([xform1 xform2 & xforms3+]
(fn [jxf xf]
(join jxf xf)
(x/for [[k [v x]] %] [k (conj v x)])))
(join xform1 xform2) xforms3+)))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment