Skip to content

Instantly share code, notes, and snippets.

@vilicvane
Created April 9, 2018 06:28
Show Gist options
  • Save vilicvane/9a6451d637b99d93a537da3948848b42 to your computer and use it in GitHub Desktop.
Save vilicvane/9a6451d637b99d93a537da3948848b42 to your computer and use it in GitHub Desktop.
import {Agent as HttpsAgent} from 'https';
import Debug = require('debug');
import _ = require('lodash');
import {inflate} from 'pako';
import * as ProxyAgent from 'proxy-agent';
import {Subject} from 'rxjs';
import WebSocket = require('ws');
const US_API_URL = 'wss://api.huobipro.com/ws';
const HTTPS_PROXY = process.env.HTTPS_PROXY;
const AGENT = HTTPS_PROXY
? new ProxyAgent(HTTPS_PROXY)
: new HttpsAgent({keepAlive: true});
const debug = Debug('ex-master-panorama:huobi');
export class WSConnection {
readonly message$ = new Subject<any>();
private topics: string[] = [];
private ws!: WebSocket;
constructor(private url = US_API_URL) {
this.connect();
}
subscribe(topics: string[]): void {
let newTopics = _.difference(topics, this.topics);
this.topics.push(...newTopics);
if (this.ws.readyState === WebSocket.OPEN) {
this._subscribe(newTopics);
}
}
private connect() {
debug(`proxy ${HTTPS_PROXY || '-'}`);
debug(`connecting to ${this.url}`);
this.ws = new WebSocket(this.url, {
agent: AGENT,
});
this.ws.on('open', () => {
debug('connection opened');
this._subscribe(this.topics);
});
this.ws.on('message', (buffer: Buffer) => {
let message = JSON.parse(inflate(buffer, {to: 'string'}));
if (message.ping) {
this.send({pong: message.ping});
return;
}
if (message.subbed) {
return;
}
this.message$.next(message);
});
this.ws.on('close', () => {
debug('connection closed');
this.connect();
});
}
private _subscribe(topics: string[]): void {
for (let topic of topics) {
this.send({
sub: topic,
id: topic,
});
}
}
private send(message: any): void {
this.ws.send(JSON.stringify(message));
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment