-
-
Save zerg000000/2006009abefeac47a5ac82f38a55a8cb to your computer and use it in GitHub Desktop.
{:deps {net.cgrand/xforms {:mvn/version "0.19.0"} | |
org.clojure/clojure {:mvn/version "1.10.1"} | |
org.clojure/core.async {:mvn/version "0.4.500"}}} |
;; dataset | |
(def data [{:ts #inst "2019-01-01T12:00:00.125Z" :price 10.0} | |
{:ts #inst "2019-01-01T12:00:00.250Z" :price 12.0} | |
{:ts #inst "2019-01-01T12:00:00.375Z" :price 10.0} | |
{:ts #inst "2019-01-01T12:00:00.500Z" :price 10.0} | |
{:ts #inst "2019-01-01T12:00:00.625Z" :price 10.0} | |
{:ts #inst "2019-01-01T12:00:00.750Z" :price 10.0} | |
{:ts #inst "2019-01-01T12:00:00.875Z" :price 10.0} | |
{:ts #inst "2019-01-01T12:00:01.000Z" :price 12.0} | |
{:ts #inst "2019-01-01T12:00:01.125Z" :price 10.0} | |
{:ts #inst "2019-01-01T12:00:01.250Z" :price 8.0} | |
{:ts #inst "2019-01-01T12:00:01.375Z" :price 10.0} | |
{:ts #inst "2019-01-01T12:00:01.500Z" :price 1.0} | |
{:ts #inst "2019-01-01T12:00:01.625Z" :price 10.0} | |
{:ts #inst "2019-01-01T12:00:01.750Z" :price 10.0} | |
{:ts #inst "2019-01-01T12:00:01.875Z" :price 7.0} | |
{:ts #inst "2019-01-01T12:00:02.000Z" :price 11.0}]) | |
;; require transducer library | |
(require '[net.cgrand.xforms :as x]) | |
(require '[net.cgrand.xforms.rfs :as rf]) | |
;; since I want to window by second | |
;; timestamp to sec | |
(defn ts->sec [x] | |
(int (/ (-> x :ts (.getTime)) | |
1000.0))) | |
(def first-xf (x/reduce rf/some)) | |
(def price-xf | |
(comp (map :price) | |
(x/transjuxt {:min x/min | |
:max x/max | |
:open first-xf | |
:close x/last}))) | |
(defn ticks->ohlc [ticks] | |
(x/into [] price-xf ticks)) | |
; Actually, we can use non-transducer version for this fn | |
#_(defn ticks->ohlc [ticks] | |
(let [prices (mapv :price ticks)] | |
[{:min (min prices) | |
:max (max prices) | |
:open (first prices) | |
:close (last prices)}])) | |
(def xf | |
(comp (partition-by ts->sec) | |
(mapcat ticks->ohlc))) | |
(into [] xf data) | |
;; it could be reused in stream based processing | |
(require '[clojure.core.async :as ac]) | |
;; attach transducer to a stream | |
(def ch (ac/chan 1000 xf)) | |
;; create a async loop to prn output of stream | |
(ac/go-loop [] | |
(when-let [v (ac/<! ch)] | |
(prn v) | |
(recur))) | |
;; put values into the stream as you want | |
(doseq [x data] | |
(ac/>!! ch x)) | |
;; notice the last one is missing | |
;; you have to close the stream to see the last result | |
(ac/close! ch) |
curious, line 35: does that mean ... (map first)
won't work?
curious, line 35: does that mean
... (map first)
won't work?
(map first)
have a different meaning
(into [] (map first) [ [1] [2] [3] [4] [5] ])
[1 2 3 4 5]
using
(x/reduce rf/some) would be more clear?
Interestingly, the following fn gives the same result
(defn ticks->ohlc [ticks]
(into []
(comp (map :price)
(x/transjuxt {:min x/min
:max x/max
:open identity
:close x/last}))
ticks))
Say the following is true
(=
(into [] (x/transjuxt [identity x/max]) [3 4 5])
(into [] (x/transjuxt [(map identity) x/max]) [3 4 5]))
Interestingly, the following fn gives the same result
(defn ticks->ohlc [ticks] (into [] (comp (map :price) (x/transjuxt {:min x/min :max x/max :open identity :close x/last})) ticks))
Say the following is
true
(= (into [] (x/transjuxt [identity x/max]) [3 4 5]) (into [] (x/transjuxt [(map identity) x/max]) [3 4 5]))
After some thinking, identity
actually match the definition of transducer.
The definition of transducer -- a function that take reducing-fn return a function as reducing-fn
(fn identity [reducing-fn]
([] (reducing-fn))
([acc] (reducing-fn acc))
([acc item] (reducing-fn acc item)))
Result is the same if (def first-xf rf/some)
is used instead of (def first-xf (x/reduce rf/some))
, though rf/some
is not a transducer. Not sure if there's any performance difference if input Coll is large.
the answer could be yes or not, transducer is (reducing-fn) -> reducing-fn
, something like middleware in ring. rf/some
is a reducing-fn, something like handler in ring.
line 43 should be:
(into [] xf data)
.