Skip to content

Instantly share code, notes, and snippets.

@slidenerd
Created May 12, 2018 06:58
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
Star You must be signed in to star a gist
Save slidenerd/f3019b03f5ea9023dfb365d95e60d161 to your computer and use it in GitHub Desktop.
test mongodb alerts with aggregation pipeline
const
fs = require('fs'),
fse = require('fs-extra'),
MongoClient = require('mongodb').MongoClient,
mongoose = require('mongoose'),
ObjectID = require('mongodb').ObjectID,
request = require('request')
const includes = ["AUD", "BRL", "CAD", "CHF", "CLP", "CNY", "CZK", "DKK", "EUR", "GBP", "HKD", "HUF", "IDR", "ILS", "INR", "JPY", "KRW", "MXN", "MYR", "NOK", "NZD", "PHP", "PKR", "PLN", "RUB", "SEK", "SGD", "THB", "TRY", "TWD", "USD", "ZAR"]
function normalizeDestinations(fiats = {}) {
const keys = Object.keys(fiats.quotes)
const normalizedFiats = []
for (let i = 0, length = keys.length; i < length; i++) {
const key = keys[i].replace("USD", "")
if (includes && includes.length) {
if (includes.indexOf(key) >= 0) {
normalizedFiats.push({
symbol: key,
price: fiats.quotes[keys[i]]
})
}
}
else {
normalizedFiats.push({
symbol: key,
price: fiats.quotes[keys[i]]
})
}
}
return normalizedFiats
}
function getAlerts(sources = [], destinations = [], count = 10, sourceLimit = 10, destinationLimit = 10, userCount = 100, alertProbability = 0.1) {
const getRandomPrice = (sourcePrice, destinationPrice, multiplier = 0.1) => {
const product = sourcePrice * destinationPrice
const gx = product + multiplier * product
const lx = product - multiplier * product
return +((lx + (gx - lx) * Math.random()).toFixed(2))
}
const userIds = []
for (let i = 0; i < userCount; i++) {
userIds.push(new ObjectID())
}
const alerts = [], condensedAlerts = []
let uniqueAlerts = {}
for (let i = 0; i < count; i++) {
let sourceIndex
do {
sourceIndex = Math.floor(Math.random() * Math.min(sourceLimit, sources.length))
}
while (sources[sourceIndex].price_usd === null || sources[sourceIndex].price_usd === undefined)
let destinationIndex = Math.floor(Math.random() * Math.min(destinations.length, destinationLimit))
const id = sources[sourceIndex].id + ":" + destinations[destinationIndex].symbol
const randomUserId = userIds[Math.floor(Math.random() * userIds.length)]
const randomPrice = getRandomPrice(+sources[sourceIndex].price_usd, destinations[destinationIndex].price)
const randomDirection = Math.floor(Math.random() * 2) === 0 ? false : true
const item = {
_id: new ObjectID(), //unique alert id
1: randomUserId, //unique user id
2: sources[sourceIndex].id, //from
3: destinations[destinationIndex].symbol, //to
4: id,
5: randomPrice, //price
6: randomDirection, //false = less than alert, true = greater than alert
7: 0, //type, 0 = price alert, percentage alert
}
alerts.push(item)
const condensedItem = {
_id: new ObjectID(), //unique alert id
1: randomUserId, //unique user id
2: id,
3: randomPrice, //price
4: randomDirection, //false = less than alert, true = greater than alert
5: 0, //type, 0 = price alert, percentage alert
}
condensedAlerts.push(condensedItem)
if (!uniqueAlerts[id]) {
uniqueAlerts[id] = {
_id: id,
1: sources[sourceIndex].id,
2: destinations[destinationIndex].symbol,
3: +sources[sourceIndex].price_usd * destinations[destinationIndex].price,
4: 1
}
}
else {
uniqueAlerts[id]['4'] += 1
}
}
const unique = [], queries = [], condensedUnique = [], keys = Object.keys(uniqueAlerts)
for (let i = 0, length = keys.length; i < length; i++) {
const key = keys[i]
unique.push({
_id: key,
1: uniqueAlerts[key]['1'],
2: uniqueAlerts[key]['2'],
3: uniqueAlerts[key]['3'],
4: uniqueAlerts[key]['4']
})
condensedUnique.push({
_id: key,
1: uniqueAlerts[key]['3'],
2: uniqueAlerts[key]['4']
})
queries.push({
2: { $eq: key },
$or: [
{ 4: { $eq: false }, 3: { $gte: uniqueAlerts[key]['3'] } },
{ 4: { $eq: true }, 3: { $lte: uniqueAlerts[key]['3'] } },
]
})
}
const query = {
$or: queries
}
return { alerts, unique, condensedAlerts, condensedUnique, query }
}
function getSourceOperations(sources = []) {
const operations = []
for (let i = 0, length = sources.length; i < length; i++) {
const source = sources[i]
operations.push({
updateOne:
{
"filter": { _id: source.id },
"update": {
_id: source.id,
1: +source.price_usd,
2: source.symbol
},
"upsert": true
}
})
}
return operations
}
function getDestinationOperations(destinations = []) {
const operations = []
for (let i = 0, length = destinations.length; i < length; i++) {
const key = destinations[i].symbol
const item = {
updateOne:
{
"filter": { _id: key },
"update": {
1: destinations[i].price
},
"upsert": true
}
}
if (includes && includes.length) {
if (includes.indexOf(key) >= 0) {
operations.push(item)
}
}
else {
operations.push(item)
}
}
return operations
}
function upsertNative(collectionName, operations) {
// Use connect method to connect to the Server
MongoClient.connect('mongodb://localhost:27017', function (err, client) {
const db = client.db('testalerts')
// Insert a single document
const t1 = new Date().getTime()
db.collection(collectionName).bulkWrite(operations, (error, result) => {
const t2 = new Date().getTime()
if (error) {
console.log(error, "upsertNative", collectionName, (t2 - t1) / 1000, "seconds")
}
else {
console.log(
"upsertNative",
collectionName,
(t2 - t1) / 1000,
"seconds",
"upserted", result.upsertedCount,
"inserted", result.insertedCount,
"deleted", result.deletedCount,
"matched", result.matchedCount,
"modified", result.modifiedCount
)
client.close()
}
})
})
}
function deleteInsert(collectionName, items) {
MongoClient.connect('mongodb://localhost:27017', function (err, client) {
const db = client.db('testalerts')
const t1 = new Date().getTime()
db.listCollections()
.toArray()
.then(collections => {
const t2 = new Date().getTime()
let found = false
for (let i = 0, length = collections.length; i < length; i++) {
if (collections[i].name === collectionName) {
found = true
break;
}
}
if (found) {
return db.collection(collectionName).drop()
}
})
.then(result => {
if (result) {
console.log("Deleted", collectionName, result)
}
return db.collection(collectionName).insertMany(items)
})
.then(result => {
console.log("Inserted", collectionName, result.insertedCount, result.result.n, result.result.ok)
})
.catch(console.log)
.then(() => {
client.close()
})
})
}
function aggregate(collectionName, fileName, pipeline) {
// Use connect method to connect to the Server
MongoClient.connect('mongodb://localhost:27017', function (err, client) {
const db = client.db('testalerts')
const t1 = new Date().getTime()
db.collection(collectionName).aggregate(pipeline, { allowUseDisk: true }).toArray((error, result) => {
const t2 = new Date().getTime()
if (error) {
console.log(error, "Aggregation", collectionName, (t2 - t1) / 1000, "seconds")
}
else {
console.log("Aggregation", collectionName, (t2 - t1) / 1000, "seconds returned", result.length, "documents")
}
fse.writeJson(fileName, result.slice(0, 20), { spaces: 4 })
client.close()
})
})
}
function refreshData(url, fileName, refresh = false) {
if (refresh) {
request(url, { encoding: 'utf-8' }, (error, response, body) => {
if (error) {
console.log(error)
}
else if (response.statusCode !== 200) {
console.log(response.statusCode, response.statusMessage)
}
else {
const coins = JSON.parse(body)
console.log("Saving", coins.length, "coins", fileName)
fse.writeJson(fileName, coins, {})
}
})
}
}
function queryAlerts(collectionName, query) {
MongoClient.connect('mongodb://localhost:27017', function (err, client) {
const db = client.db('testalerts')
const t1 = new Date().getTime()
db.collection(collectionName).find(query).toArray((error, result) => {
const t2 = new Date().getTime()
if (error) {
console.log(error, "query alerts", collectionName, (t2 - t1) / 1000, "seconds")
}
else {
console.log("query alerts", collectionName, (t2 - t1) / 1000, "seconds returned", result.length, "documents")
}
client.close()
})
})
}
module.exports = {
aggregate,
deleteInsert,
getAlerts,
getDestinationOperations,
getSourceOperations,
normalizeDestinations,
queryAlerts,
upsertNative,
}
refreshData("https://api.coinmarketcap.com/v1/ticker/?limit=0", "cmc.json", false)
refreshData("http://apilayer.net/api/live?access_key=f8b2013585398ee39a1ef56fc6caf458&currencies=&source=USD&format=2", "fiats.json", false)
Index.js
const
fs = require('fs'),
fse = require('fs-extra'),
utils = require('./utils');
function aggregateAlerts() {
const collectionName = "e1_alerts"
const pipeline = [
{
"$lookup": {
"from": "e1_sources",
"localField": "2",
"foreignField": "_id",
"as": "s"
}
},
{
"$unwind": "$s"
},
{
"$project": {
"_id": 0,
"1": 1,
"2": 1,
"3": 1,
"4": 1,
"5": 1,
"s": "$s.1"
}
},
{
"$lookup": {
"from": "e1_destinations",
"localField": "3",
"foreignField": "_id",
"as": "d"
}
},
{
"$unwind": "$d"
},
{
"$project":
{
"1": 1,
"2": 1,
"3": 1,
"4": 1,
"5": 1,
"m": {
"$multiply": ["$s", "$d.1"]
},
"c":
{
"$gte":
[
"$4",
{
"$multiply": ["$s", "$d.1"]
}
]
}
}
},
{
"$match":
{
"$or": [
{
"5": false,
"c": true
},
{
"5": true,
"c": false
}
]
}
}
]
return { collectionName, pipeline }
}
function aggregateUniqueAlertsNoPipelineMultipleLookups() {
const collectionName = "e1_unique_alerts"
const pipeline = [
{
"$lookup": {
"from": "e1_sources",
"localField": "1",
"foreignField": "_id",
"as": "s"
}
},
{ "$unwind": "$s" },
{
"$project": {
"_id": 0,
"1": 1,
"2": 1,
"3": 1,
"4": "$s.1"
}
},
{
"$lookup": {
"from": "e1_destinations",
"localField": "2",
"foreignField": "_id",
"as": "d"
}
},
{ "$unwind": "$d" },
{ "$project": { "1": 1, "2": 1, "3": 1, "4": { "$multiply": ["$4", "$d.1"] } } },
{
"$lookup": {
"from": "e1_alerts",
"localField": "3",
"foreignField": "4",
"as": "a"
}
},
{ "$unwind": "$a" },
{
"$project": {
"_id": "$a._id",
"1": "$a.1",
"2": "$3",
"3": "$a.5",
"4": "$a.6",
"5": "$a.7",
"6": "$4"
}
},
{
"$match": {
"$expr": {
"$or": [
{
"$and": [
{ "$eq": ["$4", false] },
{ "$gte": ["$3", "$6"] }
]
},
{
"$and": [
{ "$eq": ["$4", true] },
{ "$lte": ["$3", "$6"] }
]
}
]
}
}
}
]
return { collectionName, pipeline }
}
//all cryptos = 1525, all fiats = 32, unique alerts count = 42445 alerts count = 100000, time taken = 17.798 seconds
//created indexes in the e1_alerts table for source, destination and source:destination combination
function aggregateUniqueAlertsPipelineLastStage() {
const collectionName = "e1_unique_alerts"
const pipeline = [
{
"$lookup": {
"from": "e1_sources",
"localField": "1",
"foreignField": "_id",
"as": "s"
}
},
{ "$unwind": "$s" },
{
"$project": {
"_id": 0,
"1": 1,
"2": 1,
"3": 1,
"4": "$s.1"
}
},
{
"$lookup": {
"from": "e1_destinations",
"localField": "2",
"foreignField": "_id",
"as": "d"
}
},
{ "$unwind": "$d" },
{ "$project": { "1": 1, "2": 1, "3": 1, "4": { "$multiply": ["$4", "$d.1"] } } },
{
"$lookup": {
"from": "e1_alerts",
"let": {
"pair": "$3",
"price": "$4"
},
"pipeline": [
{
"$match": {
"$expr": {
"$and": [
{
"$eq": ["$$pair", "$4"]
},
{
"$or": [
{
"$and": [
{
"$eq": ["$6", false]
},
{
"$gte": ["$5", "$$price"]
}
]
},
{
"$and": [
{
"$eq": ["$6", true]
},
{
"$lte": ["$5", "$$price"]
}
]
}
]
}
]
}
}
}
],
"as": "a"
}
},
{ "$unwind": "$a" },
{
"$project": {
"_id": "$a._id",
"1": "$a.1",
"2": "$3",
"3": "$a.5",
"4": "$a.6",
"5": "$a.7",
"6": "$4",
}
}
]
return { collectionName, pipeline }
}
//all cryptos = 1525, all fiats = 32, unique alerts count = 42445 alerts count = 100000, time taken = 24.5 seconds
//created indexes in the e1_alerts table for source, destination and source:destination combination, pipelines = bad
function aggregateUniqueAlertsPipelineAll() {
const collectionName = "e1_unique_alerts"
const pipeline = [
{
"$lookup": {
"from": "e1_sources",
"let": { source: "$1" },
"pipeline": [
{
"$match": {
"$expr": {
"$eq": ["$_id", "$$source"]
}
}
},
{
"$project": {
"1": 1, "_id": 0
}
}
],
"as": "s"
}
},
{ "$unwind": "$s" },
{
"$project": {
"_id": 0,
"1": 1,
"2": 1,
"3": "$s.1"
}
},
{
"$lookup": {
"from": "e1_destinations",
"let": { destination: "$2" },
"pipeline": [
{
"$match": {
"$expr": {
"$eq": ["$_id", "$$destination"]
}
}
},
{
"$project": {
"_id": 0
}
}
],
"as": "d"
}
},
{ "$unwind": "$d" },
{
"$project":
{
"1": 1,
"2": 1,
"3": { "$multiply": ["$3", "$d.1"] }
}
},
{
"$lookup": {
"from": "e1_alerts",
"let": { source: "$1", destination: "$2", price: "$3" },
"pipeline": [
{
"$match": {
"$expr": {
"$and": [
{
"$eq": ["$$source", "$2"]
},
{
"$eq": ["$$destination", "$3"]
},
{
"$or": [
{
"$and": [
{
"$eq": ["$5", false]
},
{
"$gte": ["$4", "$$price"]
}
]
},
{
"$and": [
{
"$eq": ["$5", true]
},
{
"$lte": ["$4", "$$price"]
}
]
}
]
}
]
}
}
}
],
"as": "a"
}
},
{ "$unwind": "$a" },
{
"$project": {
"_id": "$a._id",
"1": "$a.1",
"2": "$1",
"3": "$2",
"4": "$a.4",
"5": "$a.5",
"6": "$a.6",
"7": "$3"
}
}
]
return { collectionName, pipeline }
}
function aggregateUniqueAlertsNoLookups() {
const collectionName = "e1_unique_alerts"
const pipeline = [
{
"$lookup": {
"from": "e1_alerts",
"localField": "_id",
"foreignField": "4",
"as": "a"
}
},
{
"$unwind": "$a"
},
{
"$project": {
"_id": "$a._id",
"1": "$a.1",
"2": "$a.4",
"3": "$a.5",
"4": "$a.6",
"5": "$a.7",
"6": "$3"
}
},
{
"$match": {
"$expr": {
"$or": [
{
"$and": [
{ "$eq": ["$4", false] },
{ "$gte": ["$3", "$6"] }
]
},
{
"$and": [
{ "$eq": ["$4", true] },
{ "$lte": ["$3", "$6"] }
]
}
]
}
}
}
]
return { collectionName, pipeline }
}
function aggregateUniqueAlertsCondensedNoLookups() {
const collectionName = "e1_unique_alerts_condensed"
const pipeline = [
{
"$lookup": {
"from": "e1_alerts_condensed",
"localField": "_id",
"foreignField": "2",
"as": "a"
}
},
{
"$unwind": "$a"
},
{
"$project": {
"_id": "$a._id",
"1": "$a.1",
"2": "$a.2",
"3": "$a.3",
"4": "$a.4",
"5": "$a.5",
"6": "$1"
}
},
{
"$match": {
"$expr": {
"$or": [
{
"$and": [
{ "$eq": ["$4", false] },
{ "$gte": ["$3", "$6"] }
]
},
{
"$and": [
{ "$eq": ["$4", true] },
{ "$lte": ["$3", "$6"] }
]
}
]
}
}
}
]
return { collectionName, pipeline }
}
async function refreshDatabase(refresh = false) {
if (refresh) {
let fiats = await fse.readJson("fiats.json")
fiats = utils.normalizeDestinations(fiats)
const cryptos = await fse.readJson("cmc.json")
const sourceOperations = utils.getSourceOperations(cryptos)
utils.upsertNative("e1_sources", sourceOperations)
const destinationOperations = utils.getDestinationOperations(fiats)
utils.upsertNative("e1_destinations", destinationOperations)
const { alerts, unique, condensedAlerts, condensedUnique, query } = utils.getAlerts(cryptos, fiats, 10000, 1, 1)
// fse.writeJson("alerts.json", alerts)
// fse.writeJson("unique.json", unique)
// fse.writeJson("condensed_alerts.json", condensedAlerts)
// fse.writeJson("condensed_unique.json", condensedUnique)
fse.writeJson("query.json", query, { spaces: 4 })
utils.deleteInsert("e1_alerts", alerts)
utils.deleteInsert("e1_unique_alerts", unique)
utils.deleteInsert("e1_alerts_condensed", condensedAlerts)
utils.deleteInsert("e1_unique_alerts_condensed", condensedUnique)
setTimeout(() => {
utils.queryAlerts("e1_alerts_condensed", query)
}, 30000)
}
}
(function () {
refreshDatabase(true)
const run1 = false
const run2 = false
if (run1) {
const { collectionName, pipeline } = aggregateUniqueAlertsNoLookups()
utils.aggregate(collectionName, "aggregation.json", pipeline)
}
if (run2) {
const { collectionName, pipeline } = aggregateUniqueAlertsCondensedNoLookups()
utils.aggregate(collectionName, "aggregation_condensed.json", pipeline)
}
})()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment