Skip to content

Instantly share code, notes, and snippets.

@wagjo
Last active December 21, 2015 16:39
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save wagjo/6335189 to your computer and use it in GitHub Desktop.
Save wagjo/6335189 to your computer and use it in GitHub Desktop.
lazy seq and interleave from reducers with core.async
(defn lazy-seq* [reducible]
(let [c (chan)
NIL (Object.)
encode-nil #(if (nil? %) NIL %)
decode-nil #(if (identical? NIL %) nil %)
reduce-fn (fn [r v] (>!! c (encode-nil v)))]
(thread
(reduce reduce-fn nil reducible)
(close! c))
(->> #(<!! c)
repeatedly
(take-while (complement nil?))
(map decode-nil))))
(defn interleave* [& colls]
(reify clojure.core.protocols.CollReduce
(coll-reduce [this f1]
(clojure.core.protocols/coll-reduce this f1 (f1)))
(coll-reduce [_ f1 init]
(let [NIL (Object.)
encode-nil #(if (nil? %) NIL %)
decode-nil #(if (identical? NIL %) nil %)
send-fn (fn [coll]
(let [c (chan)]
(thread (reduce (fn [r v] (>!! c (encode-nil v))) nil coll)
(close! c))
c))
chans (doall (map send-fn colls))]
(loop [val init
s (cycle chans)]
(let [c (first s)
x (<!! c)]
(if-not (nil? x)
(let [x (decode-nil x)
ret (f1 val x)]
(if (reduced? ret)
@ret
(recur ret (rest s))))
val)))))))
(def s (lazy-seq* (clojure.core.reducers/map inc (range))))
(first s)
(take 100 s)
s
(reduce + (clojure.core.reducers/take 100
(interleave* (range)
(clojure.core.reducers/map inc (range)))))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment