Skip to content

Instantly share code, notes, and snippets.

@jhsea3do
Created March 6, 2019 08:42
Show Gist options
  • Save jhsea3do/9a85d2a5bf4a010bfea28a395fc9eff6 to your computer and use it in GitHub Desktop.
Save jhsea3do/9a85d2a5bf4a010bfea28a395fc9eff6 to your computer and use it in GitHub Desktop.
okex ticker
const { PublicClient, V3WebsocketClient } = require("@okfe/okex-node");
const level = require('level');
const _ = require('lodash');
const PouchDB = require('pouchdb').plugin(require('pouchdb-upsert'));
const uuidv4 = require('uuid/v4');
const uuidv5 = require('uuid/v5');
const debug = (...args)=>(console.log(...args))
const excns = 'okex';
const wssns = {
'futures/candle' : [
'timestamp', 'open', 'high', 'low', 'close', 'volume', 'currency_volume'
]
}
class Core {
constructor() {
this.channels = new Set;
this.symbols = new Set;
this.favsyms = new Set(['XRP']);
this.futures = new Set;
this.spots = new Set;
this.LOCK = [ __dirname, 'data', 'app.lock' ].join('/');
this.threads = new Set;
this.subs = new Set;
}
async stop() {
for ( let t of this.threads ) {
try { clearInterval(t); } catch (e) { console.log(e); }
try { clearTimeout(t); } catch (e) { console.log(e); }
}
this.wss.emit('tic_close');
// this.wss.close();
}
async start(m=1) {
this.test(1000*60*m);
}
async test(timeout=(1000*5)) {
debug('!', 'timeout', timeout);
this.wss.connect();
let t = setTimeout(()=>{
this.stop();
}, timeout);
this.threads.add(t);
}
async listen() {
const { writeFileSync, existsSync } = require('fs');
if(existsSync(this.LOCK)) {
return;
}
writeFileSync(this.LOCK, '');
this.wss.connect();
let t = setInterval(()=>{
if(!existsSync(this.LOCK)) {
this.stop();
} else {
debug('!', 'skip');
}
}, 1000 * 5);
this.threads.add(t);
}
async handle(msg) {
if(msg.event && msg.event == 'subscribe') {
this.subs.add(msg.channel);
debug('!', 'subs', this.subs.size);
return true;
} else if(msg.event && msg.event == 'unsubscribe') {
this.subs.delete(msg.channel);
if(this.subs.size == 0) {
this.wss.emit('tic_clean');
}
return true;
} else if (msg.table && msg.data) {
for(let row of msg.data) {
let type = this.tabf(msg.table);
let base = type.split('/')[1];
let fields = wssns[type];
let body = row[base];
let record = _(_.cloneDeep(row)).omit(base).value();
let i = 0;
for(let field of fields) {
record[field] = body[i]; i++;
}
let uuid = null;
if( record.instrument_id && record.timestamp ) {
uuid = uuidv5([ record.instrument_id, record.timestamp ].join(','), uuidv5.DNS);
} else {
uuid = uuidv4();
}
let coll = this.dbm(msg.table);
coll.upsert( uuid, (d) => {
return Object.assign(d, record)
}).then(debug).catch(debug);
}
return true;
} else {
debug('>', msg);
return false;
}
}
async refresh() {
debug('pub', this.pub);
debug('channels', this.channels);
// this.channels.add("spot/ticker:BTC-USDT");
let instruments = await this.cacher('instruments');
let wss = this.wss;
if(null == instruments) {
await this.cachew('instruments', await this.pub.futures().getInstruments());
instruments = await this.cacher('instruments');
}
for(let inst of instruments) {
const sym = inst.underlying_index;
if(this.favsyms.has(sym)) {
this.symbols.add(sym);
this.futures.add(inst.instrument_id);
}
}
for(let inst of this.futures) {
for(let tab of _.keys(wssns)) {
this.channels.add([ [ tab, '60s'].join(''), inst ].join(':'));
}
}
wss.on('tic_sub', ()=>{
for ( const c of this.channels ) {
debug('!', 'sub', c);
wss.subscribe(c);
}
});
wss.on('tic_unsub', ()=>{
for ( const c of this.channels ) {
debug('!', 'unsub', c);
wss.unsubscribe(c);
}
});
wss.on('open', ()=>{
wss.emit('tic_sub');
});
wss.on('tic_clean', ()=>{
debug('!', 'tic cleanned');
if(this._waitClose === true) {
wss.close();
}
wss.emit('tic_reps');
});
wss.on('tic_reps', (url='http://localhost:5984/db')=>{
debug('!', 'replicate');
for(let name of this.tabs.keys()) {
let dest = [url, _.snakeCase([ excns, this.tabf(name)].join('/'))].join('/');
debug('!', 'replicating', dest, '...');
this.dbm(name).replicate.to(dest);
}
});
wss.on('tic_close', ()=>{
wss.emit('tic_unsub');
this._waitClose = true;
});
wss.on('message', (data, ...args)=>{
// debug('>', data, ...args);
if(data) {
try {
this.handle(JSON.parse(data));
} catch (e) { debug('!', 'handle failed', data); }
}
});
for(let sym of this.symbols) {
debug(sym)
}
for(let ins of this.futures) {
debug(ins)
}
}
async cacher(key, de=JSON.parse) {
let ret = null;
try {
let val = await this.cache.get(key);
ret = de(val);
} catch (e) {
ret = null;
}
return ret;
}
async cachew(key, obj, en=JSON.stringify) {
let ret = null;
try {
let val = en(obj);
ret = await this.cache.put(key, val);
} catch (e) {
ret = null;
}
return ret;
}
get wss() { return this._wss; }
set wss(o) { this._wss = o; }
get pub() { return this._pub; }
set pub(o) { this._pub = o; }
get channels() { return this._channels; }
set channels(o) { this._channels = o; }
get symbols() { return this._symbols; }
set symbols(o) { this._symbols = o; }
}
main=async()=>{
const core = new Core;
core.wss = new V3WebsocketClient();
core.pub = new PublicClient();
core.cache = level('data/dev/cache');
core.tabs = new Map;
core.tabf = (name)=>{
return _.find(_.keys(wssns), (k)=>(name.startsWith(k)));
}
core.dbm = (name)=>{
let tab = core.tabf(name);
let table = core.tabs.has(tab) ? core.tabs.get(tab) : null;
if(null == table) {
let dest = [ 'data/dev/dbm', _.snakeCase([ excns, core.tabf(tab)].join('/'))].join('/');
table = new PouchDB(dest);
core.tabs.set(tab, table);
}
return table;
}
let m = parseInt(process.argv[2] || '1');
await core.refresh();
await core.start(m);
}
main();
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment