-
-
Save mccraigmccraig/df19aa210d2ca3af0f2cb127f4f12bc3 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
(defn cross-streams | |
"join values from some sorted streams onto a new sorted stream | |
given keyed streams in {skey stream} values will be take from the head of | |
each stream, and the values with the lowest key value (keys being extracted with | |
-key and compared with key-comparator-fn) will be passed to selector-fn as: | |
(selector-fn {skey head-value}) | |
selector-fn returns a list of skeys to identify the values to be combined | |
into the output value by reducing with | |
(reduce #(-merge source-stream output-value [skey head-value]) | |
init-output-value | |
selected-skey-head-values) | |
repeat until all streams are drained | |
different selector-fns and ISortedStream impls can give various | |
behaviours, such as sort-merge or join" | |
([selector-fn init-output-value skey-streams] | |
(cross-streams compare selector-fn init-output-value skey-streams)) | |
([key-comparator-fn selector-fn init-output-value skey-streams] | |
;; (info "cross-streams" {:key-comparator-fn key-comparator-fn | |
;; :selector-fn selector-fn | |
;; :init-output-value init-output-value | |
;; :skey-streams skey-streams}) | |
(let [dst (s/stream) | |
skey-intermediates (->> | |
(for [[sk s] skey-streams] | |
[sk (intermediate-stream dst s)]) | |
(into {}))] | |
(pr/catch | |
(fn [e] | |
(warn e "error crossing the streams. what did you expect?" ) | |
(doseq [s (concat [dst] | |
(vals skey-streams) | |
(vals skey-intermediates))] | |
(try (-close! s) | |
(catch Exception x | |
(warn x "error closing errored streams")))) | |
(throw e)) | |
(d/loop [skey-next-vals (head-values skey-intermediates)] | |
(d/chain' | |
skey-next-vals | |
(fn [sk-nvs] | |
;; (info "nvs" nvs) | |
(next-output-value | |
key-comparator-fn | |
selector-fn | |
init-output-value | |
skey-intermediates | |
sk-nvs)) | |
(fn [[ov sk-nvs]] | |
;; (info "ov-nvs" ov nvs) | |
(if (= ::drained ov) | |
(do (s/close! dst) false) | |
(do | |
(s/put! dst ov) | |
(d/recur sk-nvs))))))) | |
(d/success-deferred | |
(s/source-only | |
dst))))) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment