Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
(:require [clojure.core.async :refer [chan sliding-buffer <! go-loop pipeline onto-chan] :as async]
[clojure.set :refer [subset?]]
[clojure.tools.logging :as log])
(defn has-all-lists? [averages-map]
(subset? #{:tick-list :sma-list :ema-list} (->> averages-map keys (into #{}))))
(defn join-averages []
(let [state (atom {})]
(fn [rf]
(fn
([] (rf))
([accumulator] (rf accumulator))
([accumulator input]
(let [uuid (:uuid input)
entry (cond
(:last-trade-price-exponential input) {:ema-list input}
(:last-trade-price-average input) {:sma-list input}
(:last-trade-price input) {:tick-list input})]
(if-let [current (get @state uuid)]
(let [_ (swap! state update-in [uuid] merge entry)
c' (get @state uuid)]
(log/info "c'" c')
(log/info "accumulator" accumulator)
(log/info "state" (with-out-str (clojure.pprint/pprint @state)))
(if (has-all-lists? c')
c'
(rf accumulator input)))
(do (swap! state merge {uuid entry})
(rf accumulator input)))))))))
(comment
(let [ema-list [{:uuid "1" :last-trade-price-exponential 10}
{:uuid "2" :last-trade-price-exponential 11}
{:uuid "3" :last-trade-price-exponential 12}]
sma-list [{:uuid "1" :last-trade-price-average 10.1}
{:uuid "2" :last-trade-price-average 10.2}
{:uuid "3" :last-trade-price-average 10.3}]
tick-list [{:uuid "1" :last-trade-price 11.1}
{:uuid "2" :last-trade-price 11.2}
{:uuid "3" :last-trade-price 11.3}]
ec (chan (sliding-buffer 100))
sc (chan (sliding-buffer 100))
tc (chan (sliding-buffer 100))
_ (onto-chan ec ema-list)
_ (onto-chan sc sma-list)
_ (onto-chan tc tick-list)
merged-ch (async/merge [tc sc ec])
output-ch (chan (sliding-buffer 100) (join-averages))]
(async/pipeline 1 output-ch (join-averages) merged-ch)
(go-loop [r (<! output-ch)]
(when-not (nil? r)
(log/info "record" r)
(recur (<! output-ch))))))
@twashing

This comment has been minimized.

Copy link
Owner Author

@twashing twashing commented May 28, 2018

Running the let block shows me that I'm correctly joining values. But the result record still doesn't have the joined value.

Why is the join-averages stateful transducer, not returning the correct result?

...
c' {:tick-list {:uuid 1, :last-trade-price 11.1}, :ema-list {:uuid 1, :last-trade-price-exponential 10}, :sma-list {:uuid 1, :last-trade-price-average 10.1}}
record {:uuid 1, :last-trade-price 11.1}
c' {:tick-list {:uuid 2, :last-trade-price 11.2}, :ema-list {:uuid 2, :last-trade-price-exponential 11}, :sma-list {:uuid 2, :last-trade-price-average 10.2}}
record {:uuid 2, :last-trade-price 11.2}
...
c' {:tick-list {:uuid 3, :last-trade-price 11.3}, :ema-list {:uuid 3, :last-trade-price-exponential 12}, :sma-list {:uuid 3, :last-trade-price-average 10.3}}
record {:uuid 1, :last-trade-price-exponential 10}
record {:uuid 2, :last-trade-price-exponential 11}
record {:uuid 3, :last-trade-price 11.3}
record {:uuid 3, :last-trade-price-exponential 12}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment