Skip to content

Instantly share code, notes, and snippets.

@zerg000000
Last active September 20, 2019 02:25
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save zerg000000/2006009abefeac47a5ac82f38a55a8cb to your computer and use it in GitHub Desktop.
Save zerg000000/2006009abefeac47a5ac82f38a55a8cb to your computer and use it in GitHub Desktop.
OHLC in Clojure
{:deps {net.cgrand/xforms {:mvn/version "0.19.0"}
org.clojure/clojure {:mvn/version "1.10.1"}
org.clojure/core.async {:mvn/version "0.4.500"}}}
;; dataset
(def data [{:ts #inst "2019-01-01T12:00:00.125Z" :price 10.0}
{:ts #inst "2019-01-01T12:00:00.250Z" :price 12.0}
{:ts #inst "2019-01-01T12:00:00.375Z" :price 10.0}
{:ts #inst "2019-01-01T12:00:00.500Z" :price 10.0}
{:ts #inst "2019-01-01T12:00:00.625Z" :price 10.0}
{:ts #inst "2019-01-01T12:00:00.750Z" :price 10.0}
{:ts #inst "2019-01-01T12:00:00.875Z" :price 10.0}
{:ts #inst "2019-01-01T12:00:01.000Z" :price 12.0}
{:ts #inst "2019-01-01T12:00:01.125Z" :price 10.0}
{:ts #inst "2019-01-01T12:00:01.250Z" :price 8.0}
{:ts #inst "2019-01-01T12:00:01.375Z" :price 10.0}
{:ts #inst "2019-01-01T12:00:01.500Z" :price 1.0}
{:ts #inst "2019-01-01T12:00:01.625Z" :price 10.0}
{:ts #inst "2019-01-01T12:00:01.750Z" :price 10.0}
{:ts #inst "2019-01-01T12:00:01.875Z" :price 7.0}
{:ts #inst "2019-01-01T12:00:02.000Z" :price 11.0}])
;; require transducer library
(require '[net.cgrand.xforms :as x])
(require '[net.cgrand.xforms.rfs :as rf])
;; since I want to window by second
;; timestamp to sec
(defn ts->sec [x]
(int (/ (-> x :ts (.getTime))
1000.0)))
(def first-xf (x/reduce rf/some))
(def price-xf
(comp (map :price)
(x/transjuxt {:min x/min
:max x/max
:open first-xf
:close x/last})))
(defn ticks->ohlc [ticks]
(x/into [] price-xf ticks))
; Actually, we can use non-transducer version for this fn
#_(defn ticks->ohlc [ticks]
(let [prices (mapv :price ticks)]
[{:min (min prices)
:max (max prices)
:open (first prices)
:close (last prices)}]))
(def xf
(comp (partition-by ts->sec)
(mapcat ticks->ohlc)))
(into [] xf data)
;; it could be reused in stream based processing
(require '[clojure.core.async :as ac])
;; attach transducer to a stream
(def ch (ac/chan 1000 xf))
;; create a async loop to prn output of stream
(ac/go-loop []
(when-let [v (ac/<! ch)]
(prn v)
(recur)))
;; put values into the stream as you want
(doseq [x data]
(ac/>!! ch x))
;; notice the last one is missing
;; you have to close the stream to see the last result
(ac/close! ch)
@rlhk
Copy link

rlhk commented Sep 19, 2019

Result is the same if (def first-xf rf/some) is used instead of (def first-xf (x/reduce rf/some)), though rf/some is not a transducer. Not sure if there's any performance difference if input Coll is large.

@zerg000000
Copy link
Author

the answer could be yes or not, transducer is (reducing-fn) -> reducing-fn, something like middleware in ring. rf/some is a reducing-fn, something like handler in ring.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment