Skip to content

Instantly share code, notes, and snippets.

@gszeliga
Created December 6, 2017 15:07
(defn aggregate-field [fname channel npar]
(let [as-value-fn (extract-field-fn fname channel)
fvalue-chan (pipe channel (chan 1024 (map as-value-fn)))
fvalue-aggregator (for [_ (range npar)]
(w-aggregate fvalue-chan))]
(go
(let [[sum count] (<! (async/reduce
#(apply map + [%1 %2])
[0.0 0]
(async/merge fvalue-aggregator)))]
(/ sum count)))))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment