Skip to content

Instantly share code, notes, and snippets.

@mccraigmccraig
Created November 3, 2017 09:56
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save mccraigmccraig/df19aa210d2ca3af0f2cb127f4f12bc3 to your computer and use it in GitHub Desktop.
Save mccraigmccraig/df19aa210d2ca3af0f2cb127f4f12bc3 to your computer and use it in GitHub Desktop.
(defn cross-streams
"join values from some sorted streams onto a new sorted stream
given keyed streams in {skey stream} values will be take from the head of
each stream, and the values with the lowest key value (keys being extracted with
-key and compared with key-comparator-fn) will be passed to selector-fn as:
(selector-fn {skey head-value})
selector-fn returns a list of skeys to identify the values to be combined
into the output value by reducing with
(reduce #(-merge source-stream output-value [skey head-value])
init-output-value
selected-skey-head-values)
repeat until all streams are drained
different selector-fns and ISortedStream impls can give various
behaviours, such as sort-merge or join"
([selector-fn init-output-value skey-streams]
(cross-streams compare selector-fn init-output-value skey-streams))
([key-comparator-fn selector-fn init-output-value skey-streams]
;; (info "cross-streams" {:key-comparator-fn key-comparator-fn
;; :selector-fn selector-fn
;; :init-output-value init-output-value
;; :skey-streams skey-streams})
(let [dst (s/stream)
skey-intermediates (->>
(for [[sk s] skey-streams]
[sk (intermediate-stream dst s)])
(into {}))]
(pr/catch
(fn [e]
(warn e "error crossing the streams. what did you expect?" )
(doseq [s (concat [dst]
(vals skey-streams)
(vals skey-intermediates))]
(try (-close! s)
(catch Exception x
(warn x "error closing errored streams"))))
(throw e))
(d/loop [skey-next-vals (head-values skey-intermediates)]
(d/chain'
skey-next-vals
(fn [sk-nvs]
;; (info "nvs" nvs)
(next-output-value
key-comparator-fn
selector-fn
init-output-value
skey-intermediates
sk-nvs))
(fn [[ov sk-nvs]]
;; (info "ov-nvs" ov nvs)
(if (= ::drained ov)
(do (s/close! dst) false)
(do
(s/put! dst ov)
(d/recur sk-nvs)))))))
(d/success-deferred
(s/source-only
dst)))))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment