(:require [clojure.core.async :refer [chan sliding-buffer <! go-loop pipeline onto-chan] :as async]
[clojure.set :refer [subset?]]
[ :as log])
(defn has-all-lists? [averages-map]
(subset? #{:tick-list :sma-list :ema-list} (->> averages-map keys (into #{}))))
(defn join-averages []
(let [state (atom {})]
(fn [rf]
([] (rf))
([accumulator] (rf accumulator))
([accumulator input]
(let [uuid (:uuid input)
entry (cond
(:last-trade-price-exponential input) {:ema-list input}
(:last-trade-price-average input) {:sma-list input}
(:last-trade-price input) {:tick-list input})]
(if-let [current (get @state uuid)]
(let [_ (swap! state update-in [uuid] merge entry)
c' (get @state uuid)]
(log/info "c'" c')
(log/info "accumulator" accumulator)
(log/info "state" (with-out-str (clojure.pprint/pprint @state)))
(if (has-all-lists? c')
(rf accumulator input)))
(do (swap! state merge {uuid entry})
(rf accumulator input)))))))))
(let [ema-list [{:uuid "1" :last-trade-price-exponential 10}
{:uuid "2" :last-trade-price-exponential 11}
{:uuid "3" :last-trade-price-exponential 12}]
sma-list [{:uuid "1" :last-trade-price-average 10.1}
{:uuid "2" :last-trade-price-average 10.2}
{:uuid "3" :last-trade-price-average 10.3}]
tick-list [{:uuid "1" :last-trade-price 11.1}
{:uuid "2" :last-trade-price 11.2}
{:uuid "3" :last-trade-price 11.3}]
ec (chan (sliding-buffer 100))
sc (chan (sliding-buffer 100))
tc (chan (sliding-buffer 100))
_ (onto-chan ec ema-list)
_ (onto-chan sc sma-list)
_ (onto-chan tc tick-list)
merged-ch (async/merge [tc sc ec])
output-ch (chan (sliding-buffer 100) (join-averages))]
(async/pipeline 1 output-ch (join-averages) merged-ch)
(go-loop [r (<! output-ch)]
(when-not (nil? r)
(log/info "record" r)
(recur (<! output-ch))))))

@twashing twashing commented May 28, 2018

Running the let block shows me that I'm correctly joining values. But the result record still doesn't have the joined value.

Why is the join-averages stateful transducer, not returning the correct result?

c' {:tick-list {:uuid 1, :last-trade-price 11.1}, :ema-list {:uuid 1, :last-trade-price-exponential 10}, :sma-list {:uuid 1, :last-trade-price-average 10.1}}
record {:uuid 1, :last-trade-price 11.1}
c' {:tick-list {:uuid 2, :last-trade-price 11.2}, :ema-list {:uuid 2, :last-trade-price-exponential 11}, :sma-list {:uuid 2, :last-trade-price-average 10.2}}
record {:uuid 2, :last-trade-price 11.2}
c' {:tick-list {:uuid 3, :last-trade-price 11.3}, :ema-list {:uuid 3, :last-trade-price-exponential 12}, :sma-list {:uuid 3, :last-trade-price-average 10.3}}
record {:uuid 1, :last-trade-price-exponential 10}
record {:uuid 2, :last-trade-price-exponential 11}
record {:uuid 3, :last-trade-price 11.3}
record {:uuid 3, :last-trade-price-exponential 12}
