Created Oct 14, 2017
Join transducer
(defn join
"Joins pairs returned by nested transducers based on their key (first item).
Each nested transducer must returns pairs made of a strictly increasing key and a value.
Emits pairs made of the join key and a vector of the joined values (nil when none).
Values in the vector appears in the same order as the xform that produced them."
([] identity)
([xform] xform)
([xform1 xform2]
(fn [rf]
(let [vq1 (volatile! clojure.lang.PersistentQueue/EMPTY)
vq2 (volatile! clojure.lang.PersistentQueue/EMPTY)
log (fn [q x] (or (some-> q (conj x)) (reduced clojure.lang.PersistentQueue/EMPTY)))
rf1 (xform1 log)
rf2 (xform2 log)]
([] (rf))
([acc] ; todo flush queues
(rf acc))
([acc x]
(let [q1 (rf1 @vq1 x)
q2 (rf2 @vq2 x)]
; todo: think about reduced
(loop [acc acc
q1' (if (reduced? q1)
(if (seq @q1)
(into @q1 (x/for [[k _] %] [k nil]) (unreduced q2)))
q2' (if (reduced? q2)
(if (seq @q2)
(into @q2 (x/for [[k _] %] [k nil]) (unreduced q1)))
(if (reduced? acc)
(if-some [[k1 x1] (peek q1')]
(if-some [[k2 x2] (peek q2')]
(let [cmp (compare k1 k2)]
(neg? cmp) (recur (rf acc [k1 [x1 nil]]) (pop q1') q2')
(pos? cmp) (recur (rf acc [k2 [nil x2]]) q1' (pop q2'))
:else (recur (rf acc [k1 [x1 x2]]) (pop q1') (pop q2'))))
(vreset! vq1 q1')
(vreset! vq2 (when-not (reduced? q2) q2'))
(vreset! vq1 (when-not (reduced? q1) q1'))
(vreset! vq2 q2')
(and (reduced? q1) (reduced? q2)) ensure-reduced)))))))
([xform1 xform2 & xforms3+]
(fn [jxf xf]
(join jxf xf)
(x/for [[k [v x]] %] [k (conj v x)])))
(join xform1 xform2) xforms3+)))
