Skip to content

Instantly share code, notes, and snippets.

@nnance
Created May 14, 2017 17:03
Show Gist options
  • Save nnance/ef7f3fbd68595d506bdcfed21a84907c to your computer and use it in GitHub Desktop.
Save nnance/ef7f3fbd68595d506bdcfed21a84907c to your computer and use it in GitHub Desktop.
GDAX WebSocket Rolling Stats
'use strict'
const Gdax = require('gdax')
const h = require('highland')
const streamStatistics = require('stream-statistics')
const newClient = (markets = ['BTC-USD']) => new Gdax.WebsocketClient(markets)
const websocket = newClient()
const tickerStream = exports.tickerStream = (s = websocket) =>
h('message', s)
.stopOnError(() => tickerStream(newClient()))
const matchedPrice = () => h.pipeline(
h.where({ type: 'match' }),
h.pluck('price')
)
const tapOrder = cb => orders => h(orders)
.through(streamStatistics())
.toArray(_ => cb(_[0], label))
const statStream = (label, duration, cb) => h.pipeline(
h.through(matchedPrice()),
h.batchWithTimeOrCount(duration, 100000),
h.tap(tapOrder(cb))
)
const logErrorAndRestart = (label, message) => () => {
console.log(`${label} ${message}, restarting`)
stream()
}
exports.createStatsStream = function (label, duration, cb) {
const stream = () => {
tickerStream()
.through(statStream(label, duration, cb))
.stopOnError(logErrorAndRestart(label, 'encountered an unexpected error'))
.done(logErrorAndRestart(label, 'stats stream ended unexpectedly'))
}
return stream
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment