Skip to content

Instantly share code, notes, and snippets.

@zerg000000
Last active September 20, 2019 02:25
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save zerg000000/2006009abefeac47a5ac82f38a55a8cb to your computer and use it in GitHub Desktop.
Save zerg000000/2006009abefeac47a5ac82f38a55a8cb to your computer and use it in GitHub Desktop.
OHLC in Clojure
{:deps {net.cgrand/xforms {:mvn/version "0.19.0"}
org.clojure/clojure {:mvn/version "1.10.1"}
org.clojure/core.async {:mvn/version "0.4.500"}}}
;; dataset
(def data [{:ts #inst "2019-01-01T12:00:00.125Z" :price 10.0}
{:ts #inst "2019-01-01T12:00:00.250Z" :price 12.0}
{:ts #inst "2019-01-01T12:00:00.375Z" :price 10.0}
{:ts #inst "2019-01-01T12:00:00.500Z" :price 10.0}
{:ts #inst "2019-01-01T12:00:00.625Z" :price 10.0}
{:ts #inst "2019-01-01T12:00:00.750Z" :price 10.0}
{:ts #inst "2019-01-01T12:00:00.875Z" :price 10.0}
{:ts #inst "2019-01-01T12:00:01.000Z" :price 12.0}
{:ts #inst "2019-01-01T12:00:01.125Z" :price 10.0}
{:ts #inst "2019-01-01T12:00:01.250Z" :price 8.0}
{:ts #inst "2019-01-01T12:00:01.375Z" :price 10.0}
{:ts #inst "2019-01-01T12:00:01.500Z" :price 1.0}
{:ts #inst "2019-01-01T12:00:01.625Z" :price 10.0}
{:ts #inst "2019-01-01T12:00:01.750Z" :price 10.0}
{:ts #inst "2019-01-01T12:00:01.875Z" :price 7.0}
{:ts #inst "2019-01-01T12:00:02.000Z" :price 11.0}])
;; require transducer library
(require '[net.cgrand.xforms :as x])
(require '[net.cgrand.xforms.rfs :as rf])
;; since I want to window by second
;; timestamp to sec
(defn ts->sec [x]
(int (/ (-> x :ts (.getTime))
1000.0)))
(def first-xf (x/reduce rf/some))
(def price-xf
(comp (map :price)
(x/transjuxt {:min x/min
:max x/max
:open first-xf
:close x/last})))
(defn ticks->ohlc [ticks]
(x/into [] price-xf ticks))
; Actually, we can use non-transducer version for this fn
#_(defn ticks->ohlc [ticks]
(let [prices (mapv :price ticks)]
[{:min (min prices)
:max (max prices)
:open (first prices)
:close (last prices)}]))
(def xf
(comp (partition-by ts->sec)
(mapcat ticks->ohlc)))
(into [] xf data)
;; it could be reused in stream based processing
(require '[clojure.core.async :as ac])
;; attach transducer to a stream
(def ch (ac/chan 1000 xf))
;; create a async loop to prn output of stream
(ac/go-loop []
(when-let [v (ac/<! ch)]
(prn v)
(recur)))
;; put values into the stream as you want
(doseq [x data]
(ac/>!! ch x))
;; notice the last one is missing
;; you have to close the stream to see the last result
(ac/close! ch)
@rlhk
Copy link

rlhk commented Sep 18, 2019

line 43 should be: (into [] xf data).

@rlhk
Copy link

rlhk commented Sep 18, 2019

curious, line 35: does that mean ... (map first) won't work?

@zerg000000
Copy link
Author

zerg000000 commented Sep 18, 2019

curious, line 35: does that mean ... (map first) won't work?

(map first) have a different meaning

(into [] (map first) [ [1] [2] [3] [4] [5] ])
[1 2 3 4 5]

using

(x/reduce rf/some) would be more clear?

@rlhk
Copy link

rlhk commented Sep 18, 2019

Interestingly, the following fn gives the same result

(defn ticks->ohlc [ticks]
  (into []
        (comp (map :price)
              (x/transjuxt {:min x/min
                            :max x/max
                            :open identity
                            :close x/last}))
        ticks))

Say the following is true

(=
  (into [] (x/transjuxt [identity x/max]) [3 4 5])
  (into [] (x/transjuxt [(map identity) x/max]) [3 4 5]))

@zerg000000
Copy link
Author

Interestingly, the following fn gives the same result

(defn ticks->ohlc [ticks]
  (into []
        (comp (map :price)
              (x/transjuxt {:min x/min
                            :max x/max
                            :open identity
                            :close x/last}))
        ticks))

Say the following is true

(=
  (into [] (x/transjuxt [identity x/max]) [3 4 5])
  (into [] (x/transjuxt [(map identity) x/max]) [3 4 5]))

After some thinking, identity actually match the definition of transducer.

The definition of transducer -- a function that take reducing-fn return a function as reducing-fn

(fn identity [reducing-fn]
  ([] (reducing-fn))
  ([acc] (reducing-fn acc))
  ([acc item] (reducing-fn acc item)))

@rlhk
Copy link

rlhk commented Sep 19, 2019

Result is the same if (def first-xf rf/some) is used instead of (def first-xf (x/reduce rf/some)), though rf/some is not a transducer. Not sure if there's any performance difference if input Coll is large.

@zerg000000
Copy link
Author

the answer could be yes or not, transducer is (reducing-fn) -> reducing-fn, something like middleware in ring. rf/some is a reducing-fn, something like handler in ring.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment