Last active
February 4, 2021 12:18
-
-
Save mcbuddha/9b41ccea9e43efaa9e4fde2a5095115a to your computer and use it in GitHub Desktop.
stonks
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#! /usr/bin/env nix-shell | |
#! nix-shell -p clojure -i "clj -Sdeps '{:deps {aleph/aleph {:mvn/version \"0.4.6\"} org.clojure/data.json {:mvn/version \"1.0.0\"} org.clojure/core.async {:mvn/version \"1.3.610\"}}}' -M" | |
(ns stonks | |
(:require [aleph.http :as http] | |
[manifold.stream :as s] | |
[clojure.data.json :as json] | |
[clojure.core.async :as async])) | |
(def stonk | |
"Transducer to accumulate updates to `etime` buckets. | |
Every input overrides the previous entry for that `etime` in the `state`. | |
The whole `state` is produced for every `input`." | |
(fn [rf] | |
(let [state (volatile! {})] | |
(fn | |
;; Init case, not used ATM | |
([] (rf)) | |
;; Result case: | |
;; called once before returning the reduction `result`. | |
;; Would be used to release state and prepare the `result` | |
;; for the next XF in the chain. `stonk` is not a reduction, | |
;; so this is not really used here. | |
([result] (rf result)) | |
;; Reduction step: | |
;; called for every `input`, with the `result` from the previous | |
;; iteration. We `assoc` the entry in `state` for this `etime` with | |
;; the value `input`. | |
;; In every iteration we call `rf` with the current `result` (not used here) | |
;; and the updated `state`. | |
;; This produces the latest `state` (with the latest version of all `etime`s) | |
;; for every new `input`. | |
([result [_ [_ etime] :as input]] | |
(vswap! state (fn [curr] (assoc curr etime input))) | |
(rf result @state)))))) | |
(def chan | |
"Channel with buffer of 10 and XF to parse JSON, filter and apply `stonk`." | |
(async/chan 10 (comp (map json/read-str) | |
(filter vector?) | |
stonk))) | |
(def sock @(http/websocket-client "wss://ws.kraken.com")) | |
(s/connect sock chan) ; Pipe websocket into channel. | |
;; Send subscribe event | |
@(s/put! sock (json/write-str | |
{:event "subscribe" | |
:pair ["XBT/EUR"] | |
:subscription {:name "ohlc"}})) | |
;; Write the last state for every message | |
(loop [] | |
(when-let [state (async/<!! chan)] | |
(do (->> state ; start with `state` | |
(map (fn [[_ [_ [_ et o h l c]]]] [et o h l c])) ; `map` it to rows of [et o h l c] | |
(map (partial clojure.string/join "\t")) ; join columns | |
(clojure.string/join "\n") ; join rows | |
(spit "stonks.dat")) ; write file | |
(println "replot") ; send replot to gnuplot on STDOUT | |
(recur)))) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment