-
-
Save 8secz-johndpope/4bf52a0d40071bfa75bfea35aea7f5b9 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
const dotenv = require('dotenv') | |
const myEnv = dotenv.config() | |
const currentEnv = process.env.NODE_ENV | |
const regionCode = process.env.REGION_CODE | |
const configFile = './' + regionCode + '/config.' + currentEnv + '.json' | |
var config = require(configFile) | |
const AWS = require('aws-sdk') | |
// console.log(config); | |
AWS.config.update({ region: config.aws.region }) | |
const ddb = new AWS.DynamoDB(config.dynamoDB) | |
const docClient = new AWS.DynamoDB.DocumentClient(config.dynamoDB) | |
// DYNAMODB | |
function putInitialRowId(env) { | |
var params = { | |
TableName: 'LastRowId', | |
Item: { | |
env: env, | |
rowId: -1, | |
}, | |
} | |
console.log('Adding a new item...') | |
docClient.put(params, function (err, data) { | |
if (err) { | |
console.error( | |
'Unable to add item. Error JSON:', | |
JSON.stringify(err, null, 2) | |
) | |
} else { | |
console.log('Added item:', JSON.stringify(data, null, 2)) | |
} | |
}) | |
} | |
async function createDynamoTable() { | |
console.log('INFO - createDynamoTable') | |
var params = { | |
TableName: 'LastRowId', | |
KeySchema: [ | |
{ AttributeName: 'env', KeyType: 'HASH' }, //Sort key | |
], | |
AttributeDefinitions: [ | |
// { AttributeName: "id", AttributeType: "N" }, | |
{ AttributeName: 'env', AttributeType: 'S' }, | |
], | |
ProvisionedThroughput: { | |
ReadCapacityUnits: 10, | |
WriteCapacityUnits: 10, | |
}, | |
} | |
// Call DynamoDB to create the table | |
ddb.createTable(params, function (err, data) { | |
if (err) { | |
console.error( | |
'Unable to create table. Error JSON:', | |
JSON.stringify(err, null, 2) | |
) | |
} else { | |
console.log( | |
'Created table. Table description JSON:', | |
JSON.stringify(data, null, 2) | |
) | |
} | |
}) | |
putInitialRowId('staging') | |
putInitialRowId('beta') | |
putInitialRowId('prod') | |
} | |
function updateLastAffectedRow(rowId) { | |
var params = { | |
TableName: 'LastRowId', | |
Key: { | |
env: currentEnv, | |
}, | |
UpdateExpression: 'set rowId = :r', | |
ExpressionAttributeValues: { | |
':r': rowId, | |
}, | |
ReturnValues: 'UPDATED_NEW', | |
} | |
console.log('Updating the item...') | |
docClient.update(params, function (err, data) { | |
if (err) { | |
console.error( | |
'Unable to update item. Error JSON:', | |
JSON.stringify(err, null, 2) | |
) | |
} else { | |
// console.log("UpdateItem succeeded:", JSON.stringify(data, null, 2)); | |
} | |
}) | |
} | |
function getLastTimeStamp(callback) { | |
var params = { | |
TableName: 'LastRowId', | |
Key: { | |
env: currentEnv, | |
}, | |
} | |
var result = docClient.get(params, callback) // or .json() | |
return result | |
} | |
module.exports.getLastTimeStamp = getLastTimeStamp | |
module.exports.updateLastAffectedRow = updateLastAffectedRow | |
module.exports.createDynamoTable = createDynamoTable | |
module.exports.ddb = ddb | |
module.exports.docClient = docClient |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// you must have aws cli installed https://docs.aws.amazon.com/cli/latest/userguide/cli-chap-install.html | |
// in .aws/ credentials should include rdsProxy user | |
const MySQLEvents = require("@rodrigogs/mysql-events"); | |
const mysql = require("mysql"); | |
const mongoose = require("mongoose"); | |
const webpush = require("web-push"); | |
const dotenv = require("dotenv"); | |
const myEnv = dotenv.config(); | |
const currentEnv = process.env.NODE_ENV; | |
const regionCode = process.env.REGION_CODE; | |
const configFile = "./" + regionCode + "/config." + currentEnv + ".json"; | |
console.log("configFile:", configFile); | |
var config = require(configFile); | |
const fs = require("fs"); | |
const AWS = require("aws-sdk"); | |
AWS.config.update({ region: config.aws.region }); | |
const { exec } = require("child_process"); | |
const awslogger = require("./aws-logger.js"); | |
const awsDdb = require("./aws-dynamodb.js"); | |
const { env, exit } = require("process"); | |
const { Consumer } = require("sqs-consumer"); | |
const { Producer } = require("sqs-producer"); | |
var os = require("os"); | |
const maxAttempts = 5; | |
// AWS stuff for active / passsive node | |
let currentInstanceId; | |
let metadata = new AWS.MetadataService({ | |
httpOptions: { timeout: 5000 }, // 5 second timeout | |
maxRetries: 10, // retry 10 times | |
retryDelayOptions: { base: 200 }, | |
}); | |
function getInstanceId(attempts) { | |
var isMac = process.platform === "darwin"; | |
if (isMac) { | |
console.info("bailing... "); | |
return; | |
} | |
metadata.request("/latest/meta-data/instance-id", function (err, instanceId) { | |
if (err) { | |
attempts += 1; | |
console.info("attempt # ", attempts, " to grab instance id...failed"); | |
if (attempts < maxAttempts) { | |
getInstanceId(attempts); | |
} | |
} else { | |
console.info("Current instance id " + instanceId); | |
currentInstanceId = instanceId; | |
} | |
}); | |
} | |
// create custom producer (supporting all opts as per the API docs: http://docs.aws.amazon.com/AWSJavaScriptSDK/latest/AWS/SQS.html#constructor-property) | |
const producer = Producer.create({ | |
queueUrl: config.sqs.workerQueueUrl, | |
region: config.sqs.region, | |
accessKeyId: config.sqs.accessKeyId, | |
secretAccessKey: config.sqs.secretAccessKey, | |
}); | |
var lastTimeStamp = -1; | |
var connection = mysql.createConnection(config.db); | |
var dbFilterExpression = config.db.db + ".*"; | |
// var dbFilterExpression = '*' | |
const insertTrigger = { | |
name: "Monitor database instance", | |
expression: dbFilterExpression, | |
statement: MySQLEvents.STATEMENTS.INSERT, | |
onEvent: async (event) => { | |
// You will receive the events here | |
// console.log(event); | |
// console.log(event.affectedRows); | |
let row = event.affectedRows[0]; | |
let xid = row.after.id; | |
let timestamp = event.timestamp; | |
if (timestamp >= lastTimeStamp) { | |
let table = event.table; | |
let data = { event: event, xid: xid }; | |
if (event.table == "Video") { | |
console.log("🍄 - post process video using aws - swf:", event.table); | |
let swfData = { | |
event: event, | |
xid: xid, | |
swf: 1, | |
activity: "mp4DownloadProcess", | |
}; | |
dispatchWork(timestamp, swfData).catch((err) => { | |
if (err) { | |
console.log(err); | |
} | |
}); | |
} | |
// TODO - move this stuff to a worker process / see worker.js run these in separate processes. | |
if ( | |
event.table == "Account" || | |
event.table == "VideoLike" || | |
event.table == "VideoComment" || | |
event.table == "Challenge" || | |
event.table == "Competition" | |
) { | |
console.log("🍄 - dispatchWork insert on table:", event.table); | |
dispatchWork(timestamp, data).catch((err) => { | |
if (err) { | |
console.log(err); | |
} | |
}); | |
} | |
} else { | |
console.log("dropping...:", xid); | |
} | |
}, | |
}; | |
const deleteTrigger = { | |
name: "MonitorDelete", | |
expression: dbFilterExpression, | |
statement: MySQLEvents.STATEMENTS.DELETE, | |
onEvent: async (event) => { | |
// You will receive the events here | |
// console.log(event); | |
// console.log(event.affectedRows); | |
console.log("delete! ", event); | |
let row = event.affectedRows[0]; | |
let xid = row.before.id; | |
let timestamp = event.timestamp; | |
console.log("lsatTimeStamp: ", lastTimeStamp); | |
if (timestamp >= lastTimeStamp) { | |
console.log("passed the condition"); | |
let table = event.table; | |
let data = { event: event, xid: xid }; | |
// TODO - move this stuff to a worker process / see worker.js run these in separate processes. | |
if (event.table == "VideoLike") { | |
console.log("delete event on VideoLike!"); | |
dispatchWork(timestamp, data).catch((err) => { | |
if (err) { | |
console.log(err); | |
} | |
}); | |
} | |
} else { | |
console.log("dropping...:", xid); | |
} | |
}, | |
}; | |
const updateTrigger = { | |
name: "MonitorUpdate", | |
expression: dbFilterExpression, | |
statement: MySQLEvents.STATEMENTS.UPDATE, | |
onEvent: async (event) => { | |
// You will receive the events here | |
// console.log(event); | |
// console.log(event.affectedRows); | |
let row = event.affectedRows[0]; | |
let xid = row.after.id; | |
let timestamp = event.timestamp; | |
if (timestamp >= lastTimeStamp) { | |
let table = event.table; | |
let data = { event: event, xid: xid }; | |
// TODO - move this stuff to a worker process / see worker.js run these in separate processes. | |
if ( | |
event.table == "VideoLike" || | |
event.table == "VideoComment" || | |
event.table == "Challenge" || | |
event.table == "Competition" | |
) { | |
console.log("New UPDATE - update firebase / slack"); | |
dispatchWork(timestamp, data).catch((err) => { | |
if (err) { | |
console.log(err); | |
} | |
}); | |
} | |
} else { | |
console.log("dropping...:", xid); | |
} | |
}, | |
}; | |
const myInstance = new MySQLEvents(connection, { | |
startAtEnd: true, | |
excludedSchemas: { | |
startAtEnd: true, | |
mysql: true, | |
}, | |
}); | |
myInstance.addTrigger(insertTrigger); | |
myInstance.addTrigger(deleteTrigger); | |
myInstance.addTrigger(updateTrigger); | |
async function disconnectMysql() { | |
try { | |
if (connection) { | |
connection.end(); | |
} | |
} catch (e) { | |
console.error("error disconnectMysql:", e); | |
} | |
} | |
process.on("exit", function () { | |
// Do some cleanup such as close db | |
myInstance.stop(); | |
disconnectMysql(); | |
}); | |
// catching signals and do something before exit | |
[ | |
"SIGHUP", | |
"SIGINT", | |
"SIGQUIT", | |
"SIGILL", | |
"SIGTRAP", | |
"SIGABRT", | |
"SIGBUS", | |
"SIGFPE", | |
"SIGUSR1", | |
"SIGSEGV", | |
"SIGUSR2", | |
"SIGTERM", | |
].forEach(function (sig) { | |
process.on(sig, function () { | |
terminator(sig); | |
console.log("signal: " + sig); | |
}); | |
}); | |
function terminator(sig) { | |
if (typeof sig === "string") { | |
// call your async task here and then call process.exit() after async task is done | |
disconnectMysql(function () { | |
console.log("Received %s - terminating server app ...", sig); | |
process.exit(1); | |
}); | |
} | |
console.log("Node server stopped."); | |
} | |
async function updateLastTimeStamp(timestamp) { | |
if (timestamp > lastTimeStamp) { | |
await awsDdb.updateLastAffectedRow(timestamp); | |
} | |
} | |
async function dispatchWork(timestamp, obj) { | |
console.log("INFO:dispatchWork"); | |
await producer.send({ | |
id: Math.random().toString(36).substring(7), | |
body: "work", | |
groupId: currentEnv, | |
deduplicationId: Math.random().toString(36).substring(7), // typically a hash of the message body | |
messageAttributes: { | |
dataObj: { DataType: "String", StringValue: JSON.stringify(obj) }, | |
}, | |
}); // TODO - make it send then mark as done | |
updateLastTimeStamp(timestamp); | |
} | |
function connect() { | |
// WARNING - this needs to sync up to python config.py | |
// there's a frequence that we need to mindful of this | |
// if there's 10,000 inserts in a second - we don't want to update firebase 10,000 times | |
// we need to do the first - then come back 10 seconds later and do another... maybe? | |
myInstance.start(); | |
myInstance.on(MySQLEvents.EVENTS.CONNECTION_ERROR, (err) => { | |
if (err) { | |
console.error( | |
"👺 MySQLEvents.EVENTS.CONNECTION_ERROR: ", | |
new Date(), | |
err | |
); | |
if (!!myInstance) { | |
console.log("INFO: attempting stop... "); | |
myInstance.stop(); | |
} | |
if (!!connection) { | |
console.log("INFO: attempting end... "); | |
connection.end(); | |
} | |
console.log("INFO: attempting handleDisconnect... "); | |
setTimeout(handleDisconnect, 2000); | |
} | |
}); | |
myInstance.on(MySQLEvents.EVENTS.ZONGJI_ERROR, (err) => { | |
if (err) { | |
console.error( | |
"👺 MySQLEvents.EVENTS.ZONGJI_ERROR: ", | |
new Date(), | |
err | |
); | |
if (!!myInstance) { | |
console.log("INFO: attempting stop... "); | |
myInstance.stop(); | |
} | |
if (!!connection) { | |
console.log("INFO: attempting end... "); | |
connection.end(); | |
} | |
setTimeout(handleDisconnect, 2000); | |
} | |
}); | |
// myInstance.start(); | |
} | |
function handleDisconnect() { | |
// awslogger.info("INFO: handleDisconnect...."); | |
console.log("👻 handleDisconnect()"); | |
// connect(); | |
disconnectMysql() | |
.then(() => { | |
connect(); | |
}) | |
.catch(console.error); | |
} | |
async function resumeFromLastTimeStamp() { | |
awsDdb.getLastTimeStamp(function (err, response) { | |
if (err) { | |
console.error(err); | |
} else { | |
console.log("getLastTimeStamp:", response); | |
lastTimeStamp = response.Item.rowId; | |
console.info("lastTimeStamp:", lastTimeStamp); | |
connect(); | |
} | |
}); | |
} | |
// Helper function under the use case - the coop - issues a truck of of messages to queue. | |
function purgeWorkerQueue() { | |
let sqs = new AWS.SQS({ | |
queueUrl: config.sqs.healthCheckQueueUrl, | |
region: config.sqs.region, | |
accessKeyId: config.sqs.accessKeyId, | |
secretAccessKey: config.sqs.secretAccessKey, | |
}); | |
sqs.purgeQueue(config.sqs.workerQueueUrl); | |
} | |
purgeWorkerQueue(); | |
getInstanceId(0); | |
resumeFromLastTimeStamp() | |
.then(() => | |
console.log("INFO: Waiting for database events...database:", config.db.db) | |
) | |
.catch(console.error); |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment