Last active
September 20, 2019 02:25
-
-
Save zerg000000/2006009abefeac47a5ac82f38a55a8cb to your computer and use it in GitHub Desktop.
OHLC in Clojure
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
{:deps {net.cgrand/xforms {:mvn/version "0.19.0"} | |
org.clojure/clojure {:mvn/version "1.10.1"} | |
org.clojure/core.async {:mvn/version "0.4.500"}}} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
;; dataset | |
(def data [{:ts #inst "2019-01-01T12:00:00.125Z" :price 10.0} | |
{:ts #inst "2019-01-01T12:00:00.250Z" :price 12.0} | |
{:ts #inst "2019-01-01T12:00:00.375Z" :price 10.0} | |
{:ts #inst "2019-01-01T12:00:00.500Z" :price 10.0} | |
{:ts #inst "2019-01-01T12:00:00.625Z" :price 10.0} | |
{:ts #inst "2019-01-01T12:00:00.750Z" :price 10.0} | |
{:ts #inst "2019-01-01T12:00:00.875Z" :price 10.0} | |
{:ts #inst "2019-01-01T12:00:01.000Z" :price 12.0} | |
{:ts #inst "2019-01-01T12:00:01.125Z" :price 10.0} | |
{:ts #inst "2019-01-01T12:00:01.250Z" :price 8.0} | |
{:ts #inst "2019-01-01T12:00:01.375Z" :price 10.0} | |
{:ts #inst "2019-01-01T12:00:01.500Z" :price 1.0} | |
{:ts #inst "2019-01-01T12:00:01.625Z" :price 10.0} | |
{:ts #inst "2019-01-01T12:00:01.750Z" :price 10.0} | |
{:ts #inst "2019-01-01T12:00:01.875Z" :price 7.0} | |
{:ts #inst "2019-01-01T12:00:02.000Z" :price 11.0}]) | |
;; require transducer library | |
(require '[net.cgrand.xforms :as x]) | |
(require '[net.cgrand.xforms.rfs :as rf]) | |
;; since I want to window by second | |
;; timestamp to sec | |
(defn ts->sec [x] | |
(int (/ (-> x :ts (.getTime)) | |
1000.0))) | |
(def first-xf (x/reduce rf/some)) | |
(def price-xf | |
(comp (map :price) | |
(x/transjuxt {:min x/min | |
:max x/max | |
:open first-xf | |
:close x/last}))) | |
(defn ticks->ohlc [ticks] | |
(x/into [] price-xf ticks)) | |
; Actually, we can use non-transducer version for this fn | |
#_(defn ticks->ohlc [ticks] | |
(let [prices (mapv :price ticks)] | |
[{:min (min prices) | |
:max (max prices) | |
:open (first prices) | |
:close (last prices)}])) | |
(def xf | |
(comp (partition-by ts->sec) | |
(mapcat ticks->ohlc))) | |
(into [] xf data) | |
;; it could be reused in stream based processing | |
(require '[clojure.core.async :as ac]) | |
;; attach transducer to a stream | |
(def ch (ac/chan 1000 xf)) | |
;; create a async loop to prn output of stream | |
(ac/go-loop [] | |
(when-let [v (ac/<! ch)] | |
(prn v) | |
(recur))) | |
;; put values into the stream as you want | |
(doseq [x data] | |
(ac/>!! ch x)) | |
;; notice the last one is missing | |
;; you have to close the stream to see the last result | |
(ac/close! ch) |
the answer could be yes or not, transducer is (reducing-fn) -> reducing-fn
, something like middleware in ring. rf/some
is a reducing-fn, something like handler in ring.
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Result is the same if
(def first-xf rf/some)
is used instead of(def first-xf (x/reduce rf/some))
, thoughrf/some
is not a transducer. Not sure if there's any performance difference if input Coll is large.