Skip to content

Instantly share code, notes, and snippets.

@meaku
Created August 4, 2015 10:06
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save meaku/4c724ec4430a9d2a805d to your computer and use it in GitHub Desktop.
Save meaku/4c724ec4430a9d2a805d to your computer and use it in GitHub Desktop.
A simple dynamodb stream for bunyan logging each line
"use strict";
var util = require("util");
var Writable = require("stream").Writable;
var model = require("../db/dynamodb");
/**
* Log Stream
*
* Uploads items using a readableStream
* Uses multiUpsert internally
*
* Does not cache elements before calling the database
* to ensure all writes to the stream make it to the database
*
*
* @param {Function} upsert
* @param {String} tableName
* @constructor
*/
function LogStream(upsert, tableName) {
var self = this;
Writable.call(this, {objectMode: true, highWaterMark: 1});
this.upsert = upsert;
this.dbReady = model.status(tableName)
.then(function () {
return model.active(tableName);
})
.catch(function (err) {
self.emit("error", err);
});
}
util.inherits(LogStream, Writable);
/**
* cache chunk and write to db as soon as we reached 25 chunks
* the callback does not necessarily mean that the item was written
* but rather that it has been cached
*
* @param chunk
* @param encoding
* @param callback
* @private
*/
LogStream.prototype._write = function (chunk, encoding, callback) {
var self = this;
//transform to ISODate
chunk.timestamp = new Date(chunk.time).toISOString();
delete chunk.time;
this.dbReady.then(function () {
return self.upsert(chunk)
.then(function () {
callback(null);
})
.catch(callback);
});
};
module.exports = function (options) {
options.tableName = options.tableName || "log";
var logStream = new LogStream(model.table(options.tableName).upsert, options.tableName);
logStream.on("error", function(err) {
console.error("Could not create bunyanDynamoStream: Make sure '" + options.tableName + "' exists.", err);
});
return logStream;
};
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment