Skip to content

Instantly share code, notes, and snippets.

@mcbuddha
Last active February 4, 2021 12:18
Show Gist options
  • Save mcbuddha/9b41ccea9e43efaa9e4fde2a5095115a to your computer and use it in GitHub Desktop.
Save mcbuddha/9b41ccea9e43efaa9e4fde2a5095115a to your computer and use it in GitHub Desktop.
stonks
#! /usr/bin/env nix-shell
#! nix-shell -p clojure -i "clj -Sdeps '{:deps {aleph/aleph {:mvn/version \"0.4.6\"} org.clojure/data.json {:mvn/version \"1.0.0\"} org.clojure/core.async {:mvn/version \"1.3.610\"}}}' -M"
(ns stonks
(:require [aleph.http :as http]
[manifold.stream :as s]
[clojure.data.json :as json]
[clojure.core.async :as async]))
(def stonk
"Transducer to accumulate updates to `etime` buckets.
Every input overrides the previous entry for that `etime` in the `state`.
The whole `state` is produced for every `input`."
(fn [rf]
(let [state (volatile! {})]
(fn
;; Init case, not used ATM
([] (rf))
;; Result case:
;; called once before returning the reduction `result`.
;; Would be used to release state and prepare the `result`
;; for the next XF in the chain. `stonk` is not a reduction,
;; so this is not really used here.
([result] (rf result))
;; Reduction step:
;; called for every `input`, with the `result` from the previous
;; iteration. We `assoc` the entry in `state` for this `etime` with
;; the value `input`.
;; In every iteration we call `rf` with the current `result` (not used here)
;; and the updated `state`.
;; This produces the latest `state` (with the latest version of all `etime`s)
;; for every new `input`.
([result [_ [_ etime] :as input]]
(vswap! state (fn [curr] (assoc curr etime input)))
(rf result @state))))))
(def chan
"Channel with buffer of 10 and XF to parse JSON, filter and apply `stonk`."
(async/chan 10 (comp (map json/read-str)
(filter vector?)
stonk)))
(def sock @(http/websocket-client "wss://ws.kraken.com"))
(s/connect sock chan) ; Pipe websocket into channel.
;; Send subscribe event
@(s/put! sock (json/write-str
{:event "subscribe"
:pair ["XBT/EUR"]
:subscription {:name "ohlc"}}))
;; Write the last state for every message
(loop []
(when-let [state (async/<!! chan)]
(do (->> state ; start with `state`
(map (fn [[_ [_ [_ et o h l c]]]] [et o h l c])) ; `map` it to rows of [et o h l c]
(map (partial clojure.string/join "\t")) ; join columns
(clojure.string/join "\n") ; join rows
(spit "stonks.dat")) ; write file
(println "replot") ; send replot to gnuplot on STDOUT
(recur))))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment