Skip to content

Instantly share code, notes, and snippets.

@cgrand
Created October 14, 2017 16:00
Show Gist options
  • Star 2 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save cgrand/f0d72f7509e04552f0d18d2eb6ed8eda to your computer and use it in GitHub Desktop.
Save cgrand/f0d72f7509e04552f0d18d2eb6ed8eda to your computer and use it in GitHub Desktop.
Join transducer
(defn join
"Joins pairs returned by nested transducers based on their key (first item).
Each nested transducer must returns pairs made of a strictly increasing key and a value.
Emits pairs made of the join key and a vector of the joined values (nil when none).
Values in the vector appears in the same order as the xform that produced them."
([] identity)
([xform] xform)
([xform1 xform2]
(fn [rf]
(let [vq1 (volatile! clojure.lang.PersistentQueue/EMPTY)
vq2 (volatile! clojure.lang.PersistentQueue/EMPTY)
log (fn [q x] (or (some-> q (conj x)) (reduced clojure.lang.PersistentQueue/EMPTY)))
rf1 (xform1 log)
rf2 (xform2 log)]
(fn
([] (rf))
([acc] ; todo flush queues
(rf acc))
([acc x]
(let [q1 (rf1 @vq1 x)
q2 (rf2 @vq2 x)]
; todo: think about reduced
(cond->
(loop [acc acc
q1' (if (reduced? q1)
(if (seq @q1)
@q1
(into @q1 (x/for [[k _] %] [k nil]) (unreduced q2)))
q1)
q2' (if (reduced? q2)
(if (seq @q2)
@q2
(into @q2 (x/for [[k _] %] [k nil]) (unreduced q1)))
q2)]
(if (reduced? acc)
acc
(if-some [[k1 x1] (peek q1')]
(if-some [[k2 x2] (peek q2')]
(let [cmp (compare k1 k2)]
(cond
(neg? cmp) (recur (rf acc [k1 [x1 nil]]) (pop q1') q2')
(pos? cmp) (recur (rf acc [k2 [nil x2]]) q1' (pop q2'))
:else (recur (rf acc [k1 [x1 x2]]) (pop q1') (pop q2'))))
(do
(vreset! vq1 q1')
(vreset! vq2 (when-not (reduced? q2) q2'))
acc))
(do
(vreset! vq1 (when-not (reduced? q1) q1'))
(vreset! vq2 q2')
acc))))
(and (reduced? q1) (reduced? q2)) ensure-reduced)))))))
([xform1 xform2 & xforms3+]
(reduce
(fn [jxf xf]
(comp
(join jxf xf)
(x/for [[k [v x]] %] [k (conj v x)])))
(join xform1 xform2) xforms3+)))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment