Skip to content

Instantly share code, notes, and snippets.

@zloadmin
Created December 11, 2019 21:14
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save zloadmin/cf10c048498b90926e032e2bf7614131 to your computer and use it in GitHub Desktop.
Save zloadmin/cf10c048498b90926e032e2bf7614131 to your computer and use it in GitHub Desktop.
#!/usr/bin/env node
let WebSocketClient = require('websocket').client;
let client = new WebSocketClient();
let moment = require('moment');
const url = 'mongodb://localhost:27017';
const dbName = 'binance';
// const collectionName = "\u006D\u0061\u0074\u0069\u0063\u005F\u0062\u0074\u0063\u0073";
const collectionName = "trades";
const MongoClient = require('mongodb').MongoClient;
const assert = require('assert');
let redis = require("redis"), redis_client = redis.createClient();
var symbols = require('./symbols.json');
const stream_link = 'wss://stream.binance.com:9443/stream?streams=';
getInfo();
client.on('connectFailed', function(error) {
showLog('Connect Error: ' + error.toString());
});
client.on("error", function (err) {
showLog("Redis Error: " + err);
});
client.on('connect', function(connection) {
showLog('WebSocket Client Connected');
MongoClient.connect(url, { useNewUrlParser: true }, function(err, client) {
assert.equal(null, err);
showLog("Connected successfully to server");
const db = client.db(dbName);
const collection = db.collection(collectionName);
connection.on('message', function(message) {
if (message.type === 'utf8') {
let item = JSON.parse(message.utf8Data);
if(
typeof(item) === 'object' &&
typeof item.data !== 'undefined' &&
typeof item.data.q !== 'undefined' &&
typeof item.data.s === 'string' &&
typeof item.data.E === 'number' &&
typeof item.data.m === 'boolean' &&
typeof item.data.E != 0
) {
item.data.E_sec = Math.trunc(item.data.E / 1000);
item.data.E_min = Math.trunc(item.data.E / (1000 * 60));
item.data.E_min_5 = Math.trunc(item.data.E / (1000 * 60 * 5));
item.data.E_min_15 = Math.trunc(item.data.E / (1000 * 60 * 15));
item.data.E_hour = Math.trunc(item.data.E / (1000 * 60 * 60));
item.data.E_hour_4 = Math.trunc(item.data.E / (1000 * 60 * 60 * 4));
item.data.q_float = parseFloat(item.data.q);
// publisher.publish("laravel_database_survey", JSON.stringify({
// event: item.data.s,
// data: item.data
// }));
collection.insertOne(item.data, function (err) {
assert.equal(err, null);
});
// redis.hset('laravel_database_quantity', item.data.s.toLowerCase(), item.data.q);
// console.log(item.data.s.toLowerCase());
if(item.data.m === false) {
redis_client.hset('laravel_database_pair_quantity', item.data.s.toLowerCase(), item.data.q);
}
// console.log(item.data.s);
}
}
});
});
connection.on('error', function(error) {
showLog("Connection Error: " + error.toString());
process.exit(1);
});
connection.on('close', function() {
showLog('Connection Closed');
process.exit(1);
});
});
function showLog(message)
{
console.log(moment().format() + ' ' + getProcessString() + ' ' + message);
}
function getProcessString() {
return 'Process ' + getCurrentNumProcs() + ' of ' + getNumProcs() + '.'
}
function getStremLink() {
var str = stream_link;
getSymbols().forEach(function (symbol) {
str += symbol + '@aggTrade.b10/'
});
return str.substr(0, str.length - 1);
}
function getSymbols() {
return symbols.slice(getStartSlise(), getEndSlise());
return ['maticbtc'];
}
function getCountSymbols() {
return getSymbols().length;
}
function getSymbolsCount() {
return symbols.length;
}
function getSizeOfSymbolPath() {
return Math.floor(getSymbolsCount() / getNumProcs()) + 1;
}
function getStartSlise() {
return getCurrentNumProcs() * getSizeOfSymbolPath() - getSizeOfSymbolPath();
}
function getEndSlise() {
return getStartSlise() + getSizeOfSymbolPath()
}
function getNumProcs() {
return typeof process.argv[2] !== 'undefined' ? parseInt(process.argv[2]) : 1;
}
function getCurrentNumProcs() {
return typeof process.argv[3] !== 'undefined' ? parseInt(process.argv[3]) + 1 : 1;
}
function getInfo() {
console.log('Number of process: ' + getNumProcs());
console.log('Current number of process: ' + getCurrentNumProcs());
console.log('Symbols count: ' + getSymbolsCount());
console.log('Size of symbols path: ' + getSizeOfSymbolPath());
console.log('Start slice: ' + getStartSlise());
console.log('End slice: ' + getEndSlise());
console.log('Current path of symbols: ' + getCountSymbols());
console.log('Stream link: ' + getStremLink());
}
client.connect(getStremLink());
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment