Created
May 28, 2018 23:14
-
-
Save twashing/2f3ee581d49161fa23533fd4680fbea8 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
(:require [clojure.core.async :refer [chan sliding-buffer <! go-loop pipeline onto-chan] :as async] | |
[clojure.set :refer [subset?]] | |
[clojure.tools.logging :as log]) | |
(defn has-all-lists? [averages-map] | |
(subset? #{:tick-list :sma-list :ema-list} (->> averages-map keys (into #{})))) | |
(defn join-averages [] | |
(let [state (atom {})] | |
(fn [rf] | |
(fn | |
([] (rf)) | |
([accumulator] (rf accumulator)) | |
([accumulator input] | |
(let [uuid (:uuid input) | |
entry (cond | |
(:last-trade-price-exponential input) {:ema-list input} | |
(:last-trade-price-average input) {:sma-list input} | |
(:last-trade-price input) {:tick-list input})] | |
(if-let [current (get @state uuid)] | |
(let [_ (swap! state update-in [uuid] merge entry) | |
c' (get @state uuid)] | |
(log/info "c'" c') | |
(log/info "accumulator" accumulator) | |
(log/info "state" (with-out-str (clojure.pprint/pprint @state))) | |
(if (has-all-lists? c') | |
c' | |
(rf accumulator input))) | |
(do (swap! state merge {uuid entry}) | |
(rf accumulator input))))))))) | |
(comment | |
(let [ema-list [{:uuid "1" :last-trade-price-exponential 10} | |
{:uuid "2" :last-trade-price-exponential 11} | |
{:uuid "3" :last-trade-price-exponential 12}] | |
sma-list [{:uuid "1" :last-trade-price-average 10.1} | |
{:uuid "2" :last-trade-price-average 10.2} | |
{:uuid "3" :last-trade-price-average 10.3}] | |
tick-list [{:uuid "1" :last-trade-price 11.1} | |
{:uuid "2" :last-trade-price 11.2} | |
{:uuid "3" :last-trade-price 11.3}] | |
ec (chan (sliding-buffer 100)) | |
sc (chan (sliding-buffer 100)) | |
tc (chan (sliding-buffer 100)) | |
_ (onto-chan ec ema-list) | |
_ (onto-chan sc sma-list) | |
_ (onto-chan tc tick-list) | |
merged-ch (async/merge [tc sc ec]) | |
output-ch (chan (sliding-buffer 100) (join-averages))] | |
(async/pipeline 1 output-ch (join-averages) merged-ch) | |
(go-loop [r (<! output-ch)] | |
(when-not (nil? r) | |
(log/info "record" r) | |
(recur (<! output-ch)))))) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Running the
let
block shows me that I'm correctly joining values. But the result record still doesn't have the joined value.Why is the
join-averages
stateful transducer, not returning the correct result?