Skip to content

Instantly share code, notes, and snippets.

@sameh-sharaf
Created September 12, 2016 09:32
Show Gist options
  • Save sameh-sharaf/f9e11aafa8c45c4143835b24aee4fe4b to your computer and use it in GitHub Desktop.
Save sameh-sharaf/f9e11aafa8c45c4143835b24aee4fe4b to your computer and use it in GitHub Desktop.
This code was meant to be uploaded to AWS Lambda in order to stream dynamoDB records to be written to S3 in order to be loaded later in AWS Redshift.
sprintf = require('sprintf').sprintf;
/* Tables their INSERT commands will be written to CSV file
'articles' and 'range_test' are sample tables to test the streamer.
*/
var tables = {
'articles': ['id', 'title', 'body', 'author', 'publish_date'],
'range_test': ['hash_id', 'range_id', 'item', 'quantity'],
};
// CSV separator
const SEPARATOR = ',';
// Files SHA-1 key length
const KEY_LENGTH = 12;
// Timestamp log file
const TIMESTAMP_LOG = 'timestamp.log';
// File upload threshold
const UPLOAD_THRESHOLD = 5 * 60 * 1000;
// S3 bucket
const S3_BUCKET = 'ssharaf-bucket2';
// Database events names
var event_state = {
'INSERT': "INSERT",
'UPDATE': "MODIFY",
'DELETE': "REMOVE",
};
//### Connection to S3
var AWS = require('aws-sdk');
AWS.config = new AWS.Config();
var s3 = new AWS.S3();
//###
/**
* Generate random HEX string
* @param
* length: Desired length of generated HEX string.
* @return
* Generated HEX string.
**/
function get_random_hex(length) {
var hex = "0123456789abcdef";
var rand_hex = "";
for (var i=0; i<length; i++) {
rand_hex += hex[Math.floor(Math.random() * hex.length)];
}
return rand_hex;
}
/**
* Generate cryptographic key with SHA-1.
* @param
* text: Text to be encoded.
* @return
* Generated cryptographic SHA-1 code.
**/
function get_crypt_key(text) {
var crypto = require('crypto');
var hmac;
var algorithm = 'sha1';
var key = get_random_hex(6);
var hash;
hmac = crypto.createHmac(algorithm, key);
// change to 'binary' if you want a binary digest
hmac.setEncoding('hex');
// write in the text that you want the hmac digest for
hmac.write(text);
// you can't read from the stream until you call end()
hmac.end();
// read out hmac digest
hash = hmac.read();
return hash;
}
function replaceAll(find, replace, str) {
return str.replace(new RegExp(find, 'g'), replace);
}
// Extract table name from event source ARN
function extract_table_name(record) {
return record.eventSourceARN.split(':')[5].split('/')[1];
}
/**
* Generate CSV record for given WatchCloud log.
* @param
* record: Event record from WatchCloud logs.
* table_name: Source table which triggered the INSERT event.
* @return
* Generated CSV record.
**/
exports.create_insert_csv = function(record, table_name) {
var line = "";
var record_dyn = null;
if (table_name === 'undefined') {
table_name = extract_table_name(record);
record_dyn = record.dynamodb;
}
else {
record_dyn = record.Dynamodb;
}
tables[table_name].forEach(function(column) {
if (typeof record_dyn.NewImage[column] !== 'undefined') {
Object.keys(record_dyn.NewImage[column]).forEach(function(column_type) {
if (record_dyn.NewImage[column][column_type] != null) {
line += record_dyn.NewImage[column][column_type];
return;
}
});
}
line += SEPARATOR;
});
line = line.substr(0, line.length-1);
return line;
};
/**
* Generate WHERE clause for triggered event based on event's keys.
* @param
* record: Event record from WatchCloud logs.
* @return
* Generated SQL WHERE clause.
**/
function generate_where_clause(record) {
var where_clause = " WHERE ";
Object.keys(record.Keys).forEach(function(key) {
Object.keys(record.Keys[key]).forEach(function(key_type) {
if (record.Keys[key][key_type] != null) {
where_clause += sprintf('"%s" = \'%s\' AND ', key, record.Keys[key][key_type]);
}
});
});
where_clause = where_clause.substr(0, where_clause.length - 5);
return where_clause;
}
/**
* Generate UPDATE clause
* @param
* record: Event record from WatchCloud logs.
* table_name: Source table which triggered the UPDATE event.
* @return
* Generated UPDATE SQL query.
**/
exports.generate_update_query = function (record, table_name) {
var query = "";
if (table_name === 'undefined') {
table_name = extract_table_name(record);
record_dyn = record.dynamodb;
}
else {
record_dyn = record.Dynamodb;
}
query = sprintf('UPDATE "%s" SET ', table_name);
// SET generator
var keys = Object.keys(record_dyn.NewImage);
keys.forEach(function(key) {
Object.keys(record_dyn.NewImage[key]).forEach(function(key_type) {
if ((record_dyn.NewImage[key][key_type] != null) &&
(typeof record_dyn.OldImage[key] === 'undefined'
|| record_dyn.NewImage[key][key_type] != record_dyn.OldImage[key][key_type])) {
query += sprintf('"%s" = \'%s\', ', key, replaceAll("'", "\\'", record_dyn.NewImage[key][key_type]));
}
});
});
// Check for ommited columns
var oldImageKeys = Object.keys(record_dyn.OldImage);
Object.keys(record_dyn.NewImage).forEach(function(key) {
var index = oldImageKeys.indexOf(key);
if (index != -1) {
oldImageKeys.splice(index, 1);
}
});
if (oldImageKeys.length != 0) {
oldImageKeys.forEach(function(key) {
query += sprintf(' "%s" = NULL, ', key);
});
}
// Remove last comma from end of query
query = query.substr(0, query.length - 2);
// Add WHERE clause
query += generate_where_clause(record_dyn) + ";";
return query;
};
/**
* Generate DELETE query
* @param
* record: Event record from WatchCloud logs.
* table_name: Source table which triggered the DELETE event.
* @return
* Generated DELETE SQL query.
**/
exports.generate_delete_query = function (record, table_name) {
if (table_name === 'undefined') {
table_name = extract_table_name(record);
record_dyn = record.dynamodb;
}
else {
record_dyn = record.Dynamodb;
}
return sprintf('DELETE FROM "%s" %s;', table_name, generate_where_clause(record_dyn));
};
/**
* Log changes on tables to be uploaded later
* @param
* dir: Log directory.
* file_name: Log file name.
* lines: Data to be logged.
* @return
* err: Error details if occured.
**/
function log_changes(dir, file_name, lines) {
var fs = require('fs');
var path = sprintf('%s/%s.log', dir, file_name);
if (!fs.existsSync(path))
create_dir(dir);
lines.forEach(function(line) {
fs.appendFileSync(path, line + '\n');
});
return;
}
/**
* Upload SQL & CSV files to S3.
* @param
* source_file: File to be uploaded.
* dest_file: Uploading Destination.
* @return
* err: Error details if occured.
**/
function upload_to_bucket(source_file, dest_file) {
var ts_hms = require('moment');
var fs = require('fs');
var bucket_name = S3_BUCKET;
var file_format = dest_file.substr(dest_file.length - 3, dest_file.length);
dest_file = dest_file.substr(0, dest_file.length - 4);
var crypto = get_crypt_key(get_random_hex(32)).substr(0, KEY_LENGTH);
var file_name = sprintf("%s-%s-%s.%s", dest_file, ts_hms().format(), crypto, file_format);
console.log("File to read: ", source_file);
console.log("File to write: ", file_name);
if (!fs.existsSync(source_file))
return "ERROR: File does not exist";
var body = fs.readFileSync(source_file);
var params = {Bucket: bucket_name, Key: file_name, Body: body};
s3.putObject(params, function(err, data) {
if (err)
return err;
console.log(sprintf("Successfully uploaded data to %s/%s", bucket_name, file_name));
});
return;
}
/**
* Empty log file for next log entries.
* @param
* source_file: Log file to be emptied.
* @return
* err: Error details if occured.
**/
function empty_log(source_file) {
var fs = require('fs');
if (!fs.existsSync(source_file))
return "ERROR: File does not exist.";
fs.writeFile(source_file, "", function (err) {
if (err)
return err;
});
return;
}
/**
* Log last upload timestamp.
* @param
* log: Timestamp log file.
* @return
* err: Error details if occured.
**/
function log_timestamp(log) {
var fs = require('fs');
var time = require('moment');
var arr = log.split("/");
var dir = arr.slice(0, arr.length-1).join("/");
var mkdirp = require('mkdirp');
mkdirp(dir, function(err) {
if (err)
return err;
});
fs.writeFile(log, time().format(), function (err) {
if (err)
return err;
});
return;
}
function create_dir(dir) {
var mkdirp = require('mkdirp');
mkdirp(dir, function(err) {
if (err)
return err;
});
}
/**
* Get upload timestamp.
* @param
* log: Log file having upload timestamp.
* @return
* Upload timestamp. OR
* Error details if occured.
**/
function get_timestamp(log) {
var fs = require('fs');
if (!fs.existsSync(log)) {
var arr = log.split("/");
var dir = arr.slice(0, arr.length-1).join("/");
create_dir(dir);
var time = require('moment');
fs.appendFileSync(log, time().format());
return time().format();
}
return fs.readFileSync(log);
}
/**
* Logging management including logging new changes to tables and/or uploading
* to S3 bucket.
* @param
* dir: Directory of log file.
* file_pref: Name prefix of uploaded log file.
* file_format: File format of uploaded file (CSV/SQL).
* lines: Data to be logged.
* @return
* Error details if occured.
**/
function manage_logs(dir, file_pref, file_format, lines) {
var fs = require('fs');
var time = require('moment');
var time_format = "YYYY-MM-DD HH:mm:ss";
var timestamp_log = sprintf("./%s/%s", dir, TIMESTAMP_LOG);
var timestamp_str = get_timestamp(timestamp_log);
var timestamp = time(timestamp_str, time_format);
console.log(sprintf('\n### Logs Info for %s ###', dir))
console.log("Last Upload Time: " + timestamp.format());
console.log("Local Time: " + time().format());
// Log data to file
console.log('Logging generated data...');
log_changes(dir, file_pref, lines);
if (time() - timestamp >= UPLOAD_THRESHOLD) {
// Upload to S3
console.log('Uploading files to bucket...');
var source_file = sprintf("%s/%s.log", dir, file_pref);
var dest_file = sprintf("%s/%s.%s", dir, file_pref, file_format);
console.log("Source File: ", source_file);
console.log("Destination File: ", dest_file);
upload_to_bucket(source_file, dest_file);
empty_log(source_file);
log_timestamp(timestamp_log);
}
return;
}
// Event handler
exports.lambda_handler = function(event, context) {
var queries = [];
var table_insert_csv = {};
event.Records.forEach(function(record) {
var query = "";
// Extract table name
var table_name = extract_table_name(record).toLowerCase();
context.succeed('Event Name: ', record.eventName);
if (record.eventName == event_state['INSERT']) {
if (typeof table_insert_csv[table_name] === 'undefined' ) {
table_insert_csv[table_name] = [];
}
var csv_record = exports.create_insert_csv(record);
context.succeed("Created CSV record: ", csv_record);
table_insert_csv[table_name].push(csv_record);
}
if (record.eventName == event_state['UPDATE']) {
query = exports.generate_update_query(record);
context.succeed("Event SQL Query: %s", query ? query : '/');
queries.push(query);
}
if (record.eventName == event_state['DELETE']) {
query = exports.generate_delete_query(record, table_name);
context.succeed("Event SQL Query: %s", query ? query : '/');
queries.push(query);
}
context.succeed("######\n");
});
context.succeed("Processed " + event.Records.length + " records.");
if (queries.length) {
context.succeed("Generated " + queries.length + " queries.");
// Manage generated queries
console.log('\n--Logging data for SQL queries:');
manage_logs('sql', 'sql', 'sql', queries);
}
if (Object.keys(table_insert_csv).length) {
context.succeed("Generated CSV files for " + Object.keys(table_insert_csv).length + " tables.");
console.log('\n--Logging data for CSV files:');
Object.keys(table_insert_csv).forEach(function(table) {
console.log('----Logging CSV file for table: ', table);
manage_logs('csv/'+table, table, 'csv', table_insert_csv[table]);
});
}
};
exports.stream_handler = function(streams, context) {
var queries = [];
var table_insert_csv = {};
streams.forEach(function (record) {
var query = "";
// Extract table name
var table_name = "range_test"; //FIXME: Get from record
context.succeed('Event Name: ', record.EventName);
if (record.EventName == event_state['INSERT']) {
if (typeof table_insert_csv[table_name] === 'undefined' ) {
table_insert_csv[table_name] = [];
}
var csv_record = exports.create_insert_csv(record, table_name);
context.succeed("Created CSV record: ", csv_record);
table_insert_csv[table_name].push(csv_record);
}
if (record.EventName == event_state['UPDATE']) {
query = exports.generate_update_query(record, table_name);
context.succeed("Event SQL Query: %s", query ? query : '/');
queries.push(query);
}
if (record.EventName == event_state['DELETE']) {
query = exports.generate_delete_query(record, table_name);
context.succeed("Event SQL Query: %s", query ? query : '/');
queries.push(query);
}
context.succeed("######\n");
});
context.succeed("Processed " + streams.length + " records.");
if (queries.length) {
context.succeed("Generated " + queries.length + " queries.");
// Manage generated queries
console.log('\n--Logging data for SQL queries:');
manage_logs('sql', 'sql', 'sql', queries);
}
if (Object.keys(table_insert_csv).length) {
context.succeed("Generated CSV files for " + Object.keys(table_insert_csv).length + " tables.");
console.log('\n--Logging data for CSV files:');
Object.keys(table_insert_csv).forEach(function(table) {
console.log('----Logging CSV file for table: ', table);
manage_logs('csv/'+table, table, 'csv', table_insert_csv[table]);
});
}
};
function getContext() {
return {
succeed: console.info,
failed: console.error,
};
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment