Skip to content

Instantly share code, notes, and snippets.

@8secz-johndpope
Last active October 13, 2021 06:32
Show Gist options
  • Save 8secz-johndpope/4bf52a0d40071bfa75bfea35aea7f5b9 to your computer and use it in GitHub Desktop.
Save 8secz-johndpope/4bf52a0d40071bfa75bfea35aea7f5b9 to your computer and use it in GitHub Desktop.
const dotenv = require('dotenv')
const myEnv = dotenv.config()
const currentEnv = process.env.NODE_ENV
const regionCode = process.env.REGION_CODE
const configFile = './' + regionCode + '/config.' + currentEnv + '.json'
var config = require(configFile)
const AWS = require('aws-sdk')
// console.log(config);
AWS.config.update({ region: config.aws.region })
const ddb = new AWS.DynamoDB(config.dynamoDB)
const docClient = new AWS.DynamoDB.DocumentClient(config.dynamoDB)
// DYNAMODB
function putInitialRowId(env) {
var params = {
TableName: 'LastRowId',
Item: {
env: env,
rowId: -1,
},
}
console.log('Adding a new item...')
docClient.put(params, function (err, data) {
if (err) {
console.error(
'Unable to add item. Error JSON:',
JSON.stringify(err, null, 2)
)
} else {
console.log('Added item:', JSON.stringify(data, null, 2))
}
})
}
async function createDynamoTable() {
console.log('INFO - createDynamoTable')
var params = {
TableName: 'LastRowId',
KeySchema: [
{ AttributeName: 'env', KeyType: 'HASH' }, //Sort key
],
AttributeDefinitions: [
// { AttributeName: "id", AttributeType: "N" },
{ AttributeName: 'env', AttributeType: 'S' },
],
ProvisionedThroughput: {
ReadCapacityUnits: 10,
WriteCapacityUnits: 10,
},
}
// Call DynamoDB to create the table
ddb.createTable(params, function (err, data) {
if (err) {
console.error(
'Unable to create table. Error JSON:',
JSON.stringify(err, null, 2)
)
} else {
console.log(
'Created table. Table description JSON:',
JSON.stringify(data, null, 2)
)
}
})
putInitialRowId('staging')
putInitialRowId('beta')
putInitialRowId('prod')
}
function updateLastAffectedRow(rowId) {
var params = {
TableName: 'LastRowId',
Key: {
env: currentEnv,
},
UpdateExpression: 'set rowId = :r',
ExpressionAttributeValues: {
':r': rowId,
},
ReturnValues: 'UPDATED_NEW',
}
console.log('Updating the item...')
docClient.update(params, function (err, data) {
if (err) {
console.error(
'Unable to update item. Error JSON:',
JSON.stringify(err, null, 2)
)
} else {
// console.log("UpdateItem succeeded:", JSON.stringify(data, null, 2));
}
})
}
function getLastTimeStamp(callback) {
var params = {
TableName: 'LastRowId',
Key: {
env: currentEnv,
},
}
var result = docClient.get(params, callback) // or .json()
return result
}
module.exports.getLastTimeStamp = getLastTimeStamp
module.exports.updateLastAffectedRow = updateLastAffectedRow
module.exports.createDynamoTable = createDynamoTable
module.exports.ddb = ddb
module.exports.docClient = docClient
// you must have aws cli installed https://docs.aws.amazon.com/cli/latest/userguide/cli-chap-install.html
// in .aws/ credentials should include rdsProxy user
const MySQLEvents = require("@rodrigogs/mysql-events");
const mysql = require("mysql");
const mongoose = require("mongoose");
const webpush = require("web-push");
const dotenv = require("dotenv");
const myEnv = dotenv.config();
const currentEnv = process.env.NODE_ENV;
const regionCode = process.env.REGION_CODE;
const configFile = "./" + regionCode + "/config." + currentEnv + ".json";
console.log("configFile:", configFile);
var config = require(configFile);
const fs = require("fs");
const AWS = require("aws-sdk");
AWS.config.update({ region: config.aws.region });
const { exec } = require("child_process");
const awslogger = require("./aws-logger.js");
const awsDdb = require("./aws-dynamodb.js");
const { env, exit } = require("process");
const { Consumer } = require("sqs-consumer");
const { Producer } = require("sqs-producer");
var os = require("os");
const maxAttempts = 5;
// AWS stuff for active / passsive node
let currentInstanceId;
let metadata = new AWS.MetadataService({
httpOptions: { timeout: 5000 }, // 5 second timeout
maxRetries: 10, // retry 10 times
retryDelayOptions: { base: 200 },
});
function getInstanceId(attempts) {
var isMac = process.platform === "darwin";
if (isMac) {
console.info("bailing... ");
return;
}
metadata.request("/latest/meta-data/instance-id", function (err, instanceId) {
if (err) {
attempts += 1;
console.info("attempt # ", attempts, " to grab instance id...failed");
if (attempts < maxAttempts) {
getInstanceId(attempts);
}
} else {
console.info("Current instance id " + instanceId);
currentInstanceId = instanceId;
}
});
}
// create custom producer (supporting all opts as per the API docs: http://docs.aws.amazon.com/AWSJavaScriptSDK/latest/AWS/SQS.html#constructor-property)
const producer = Producer.create({
queueUrl: config.sqs.workerQueueUrl,
region: config.sqs.region,
accessKeyId: config.sqs.accessKeyId,
secretAccessKey: config.sqs.secretAccessKey,
});
var lastTimeStamp = -1;
var connection = mysql.createConnection(config.db);
var dbFilterExpression = config.db.db + ".*";
// var dbFilterExpression = '*'
const insertTrigger = {
name: "Monitor database instance",
expression: dbFilterExpression,
statement: MySQLEvents.STATEMENTS.INSERT,
onEvent: async (event) => {
// You will receive the events here
// console.log(event);
// console.log(event.affectedRows);
let row = event.affectedRows[0];
let xid = row.after.id;
let timestamp = event.timestamp;
if (timestamp >= lastTimeStamp) {
let table = event.table;
let data = { event: event, xid: xid };
if (event.table == "Video") {
console.log("🍄 - post process video using aws - swf:", event.table);
let swfData = {
event: event,
xid: xid,
swf: 1,
activity: "mp4DownloadProcess",
};
dispatchWork(timestamp, swfData).catch((err) => {
if (err) {
console.log(err);
}
});
}
// TODO - move this stuff to a worker process / see worker.js run these in separate processes.
if (
event.table == "Account" ||
event.table == "VideoLike" ||
event.table == "VideoComment" ||
event.table == "Challenge" ||
event.table == "Competition"
) {
console.log("🍄 - dispatchWork insert on table:", event.table);
dispatchWork(timestamp, data).catch((err) => {
if (err) {
console.log(err);
}
});
}
} else {
console.log("dropping...:", xid);
}
},
};
const deleteTrigger = {
name: "MonitorDelete",
expression: dbFilterExpression,
statement: MySQLEvents.STATEMENTS.DELETE,
onEvent: async (event) => {
// You will receive the events here
// console.log(event);
// console.log(event.affectedRows);
console.log("delete! ", event);
let row = event.affectedRows[0];
let xid = row.before.id;
let timestamp = event.timestamp;
console.log("lsatTimeStamp: ", lastTimeStamp);
if (timestamp >= lastTimeStamp) {
console.log("passed the condition");
let table = event.table;
let data = { event: event, xid: xid };
// TODO - move this stuff to a worker process / see worker.js run these in separate processes.
if (event.table == "VideoLike") {
console.log("delete event on VideoLike!");
dispatchWork(timestamp, data).catch((err) => {
if (err) {
console.log(err);
}
});
}
} else {
console.log("dropping...:", xid);
}
},
};
const updateTrigger = {
name: "MonitorUpdate",
expression: dbFilterExpression,
statement: MySQLEvents.STATEMENTS.UPDATE,
onEvent: async (event) => {
// You will receive the events here
// console.log(event);
// console.log(event.affectedRows);
let row = event.affectedRows[0];
let xid = row.after.id;
let timestamp = event.timestamp;
if (timestamp >= lastTimeStamp) {
let table = event.table;
let data = { event: event, xid: xid };
// TODO - move this stuff to a worker process / see worker.js run these in separate processes.
if (
event.table == "VideoLike" ||
event.table == "VideoComment" ||
event.table == "Challenge" ||
event.table == "Competition"
) {
console.log("New UPDATE - update firebase / slack");
dispatchWork(timestamp, data).catch((err) => {
if (err) {
console.log(err);
}
});
}
} else {
console.log("dropping...:", xid);
}
},
};
const myInstance = new MySQLEvents(connection, {
startAtEnd: true,
excludedSchemas: {
startAtEnd: true,
mysql: true,
},
});
myInstance.addTrigger(insertTrigger);
myInstance.addTrigger(deleteTrigger);
myInstance.addTrigger(updateTrigger);
async function disconnectMysql() {
try {
if (connection) {
connection.end();
}
} catch (e) {
console.error("error disconnectMysql:", e);
}
}
process.on("exit", function () {
// Do some cleanup such as close db
myInstance.stop();
disconnectMysql();
});
// catching signals and do something before exit
[
"SIGHUP",
"SIGINT",
"SIGQUIT",
"SIGILL",
"SIGTRAP",
"SIGABRT",
"SIGBUS",
"SIGFPE",
"SIGUSR1",
"SIGSEGV",
"SIGUSR2",
"SIGTERM",
].forEach(function (sig) {
process.on(sig, function () {
terminator(sig);
console.log("signal: " + sig);
});
});
function terminator(sig) {
if (typeof sig === "string") {
// call your async task here and then call process.exit() after async task is done
disconnectMysql(function () {
console.log("Received %s - terminating server app ...", sig);
process.exit(1);
});
}
console.log("Node server stopped.");
}
async function updateLastTimeStamp(timestamp) {
if (timestamp > lastTimeStamp) {
await awsDdb.updateLastAffectedRow(timestamp);
}
}
async function dispatchWork(timestamp, obj) {
console.log("INFO:dispatchWork");
await producer.send({
id: Math.random().toString(36).substring(7),
body: "work",
groupId: currentEnv,
deduplicationId: Math.random().toString(36).substring(7), // typically a hash of the message body
messageAttributes: {
dataObj: { DataType: "String", StringValue: JSON.stringify(obj) },
},
}); // TODO - make it send then mark as done
updateLastTimeStamp(timestamp);
}
function connect() {
// WARNING - this needs to sync up to python config.py
// there's a frequence that we need to mindful of this
// if there's 10,000 inserts in a second - we don't want to update firebase 10,000 times
// we need to do the first - then come back 10 seconds later and do another... maybe?
myInstance.start();
myInstance.on(MySQLEvents.EVENTS.CONNECTION_ERROR, (err) => {
if (err) {
console.error(
"👺 MySQLEvents.EVENTS.CONNECTION_ERROR: ",
new Date(),
err
);
if (!!myInstance) {
console.log("INFO: attempting stop... ");
myInstance.stop();
}
if (!!connection) {
console.log("INFO: attempting end... ");
connection.end();
}
console.log("INFO: attempting handleDisconnect... ");
setTimeout(handleDisconnect, 2000);
}
});
myInstance.on(MySQLEvents.EVENTS.ZONGJI_ERROR, (err) => {
if (err) {
console.error(
"👺 MySQLEvents.EVENTS.ZONGJI_ERROR: ",
new Date(),
err
);
if (!!myInstance) {
console.log("INFO: attempting stop... ");
myInstance.stop();
}
if (!!connection) {
console.log("INFO: attempting end... ");
connection.end();
}
setTimeout(handleDisconnect, 2000);
}
});
// myInstance.start();
}
function handleDisconnect() {
// awslogger.info("INFO: handleDisconnect....");
console.log("👻 handleDisconnect()");
// connect();
disconnectMysql()
.then(() => {
connect();
})
.catch(console.error);
}
async function resumeFromLastTimeStamp() {
awsDdb.getLastTimeStamp(function (err, response) {
if (err) {
console.error(err);
} else {
console.log("getLastTimeStamp:", response);
lastTimeStamp = response.Item.rowId;
console.info("lastTimeStamp:", lastTimeStamp);
connect();
}
});
}
// Helper function under the use case - the coop - issues a truck of of messages to queue.
function purgeWorkerQueue() {
let sqs = new AWS.SQS({
queueUrl: config.sqs.healthCheckQueueUrl,
region: config.sqs.region,
accessKeyId: config.sqs.accessKeyId,
secretAccessKey: config.sqs.secretAccessKey,
});
sqs.purgeQueue(config.sqs.workerQueueUrl);
}
purgeWorkerQueue();
getInstanceId(0);
resumeFromLastTimeStamp()
.then(() =>
console.log("INFO: Waiting for database events...database:", config.db.db)
)
.catch(console.error);
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment