Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save daluu/d1e667d946c9c853aff332aca335fafc to your computer and use it in GitHub Desktop.
Save daluu/d1e667d946c9c853aff332aca335fafc to your computer and use it in GitHub Desktop.
Sample bolt showing how to initialize a bolt and read in Apache Storm topology configuration and process tuple in node.js and how to unit test it
#! /usr/bin/env node
'use strict';
var storm = require("pathTo/storm.js"); //the official storm multilang node client/binding from Apache Storm project
var BasicBolt = storm.BasicBolt;
TestBolt.prototype = Object.create(BasicBolt.prototype);
TestBolt.prototype.constructor = TestBolt;
function TestBolt() {
BasicBolt.call(this);
//any additional variable/function initialization here
//this code gets called before initialize function
};
TestBolt.prototype.initialize = function(conf, context, done) {
//this initialization is called/done after the constructor
this.configurationParameterName = typeof conf['configurationParameterName'] !== 'undefined' ? conf['configurationParameterName'] : "some default value";
done();
} //no need to define an initialize for BasicBolt itself unless you want/need to
TestBolt.prototype.process = function(tuple, done) {
//your normal bolt code here...
if(tuple.isTickTuple()){
//what to do on tick interval? e.g. cache cleanup, batch processing, etc.
}
if(tuple.isHeartbeatTuple()){
//what to do on heartbeats? e.g. health checks
}
console.log("Processing tuple, which has value: %s\n",JSON.stringify(tuple.values)); //tuple.values is an array of data
done(); // `done` must be called to ack (processing the tuple, not automatic. think of it like a "return" statement)
//return; //in case done() doesn't abort tuple processing at this point (in scenarios where you have branching logic)
};
var bolt = new TestBolt();
bolt.run();
#! /usr/bin/env node
var chai = require('chai');
var expect = chai.expect; // we are using the "expect" style of Chai
var SUT = require('pathTo/yourBoltUnderTest.js');
var storm = require("pathTo/storm.js"); //the official storm multilang node client/binding from Apache Storm project
var fs = require('fs');
var path = require('path');
var capturedStdout = "";
describe('YourBoltUnderTest', function() {
it('does something', function() {
//unless one has better way of unit testing I/O of the bolt, we capture/redirect I/O to validate
//also looking for a way to mock any node.js module call within bolt to avoid external dependencies
var intercept = require("intercept-stdout"), capturedStdout = "";
var unhook_intercept = intercept(function(txt) { capturedStdout += txt; });
var YourBoltUnderTest = new SUT.YourBoltUnderTestBolt();
var cfg = {}, context = {}, done = function() { return; };
YourBoltUnderTest.initialize(cfg,context,done);
var input = fs.readFileSync(path.resolve(__dirname, "input/data.json"),{'encoding':'utf8'});
var tup = new storm.Tuple(1,"default","default",1,["data-field1",input]);
YourBoltUnderTest.process(tup,done); //repeat this type of call to simulate processing multiple messages from stream
unhook_intercept();
//strip out the "end" string from storm output/messages
var output = capturedStdout.replace(new RegExp('end', 'g'),'').trim().split("\n");
//in this example, we always emit as the last action per process event in the bolt, so it would be the last line of output
var result = JSON.parse(output[output.length-1]);
result = JSON.parse(result.tuple[0]); //we only have 1 output tuple in example, could be more
expect(result.someFieldOfOutputTupleDataDefinedInYourBolt).to.equal("your expected value");
});
it('properly pulls in storm topology configuration', function() {
var YourBoltUnderTest = new SUT.YourBoltUnderTestBolt();
var cfg = {}, context = {}, done = function() { return; };
cfg['configurationParameterName'] = 123;
YourBoltUnderTest.initialize(cfg,context,done);
//validate defined topology config that overrides defaults
expect(YourBoltUnderTest.configurationParameterName).to.equal(123);
//validate default config in bolt when not overridden with topology config
expect(YourBoltUnderTest.configurationParameterName2).to.equal("abc");
});
});
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment