Last active
October 31, 2018 06:12
-
-
Save daluu/d1e667d946c9c853aff332aca335fafc to your computer and use it in GitHub Desktop.
Sample bolt showing how to initialize a bolt and read in Apache Storm topology configuration and process tuple in node.js and how to unit test it
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#! /usr/bin/env node | |
'use strict'; | |
var storm = require("pathTo/storm.js"); //the official storm multilang node client/binding from Apache Storm project | |
var BasicBolt = storm.BasicBolt; | |
TestBolt.prototype = Object.create(BasicBolt.prototype); | |
TestBolt.prototype.constructor = TestBolt; | |
function TestBolt() { | |
BasicBolt.call(this); | |
//any additional variable/function initialization here | |
//this code gets called before initialize function | |
}; | |
TestBolt.prototype.initialize = function(conf, context, done) { | |
//this initialization is called/done after the constructor | |
this.configurationParameterName = typeof conf['configurationParameterName'] !== 'undefined' ? conf['configurationParameterName'] : "some default value"; | |
done(); | |
} //no need to define an initialize for BasicBolt itself unless you want/need to | |
TestBolt.prototype.process = function(tuple, done) { | |
//your normal bolt code here... | |
if(tuple.isTickTuple()){ | |
//what to do on tick interval? e.g. cache cleanup, batch processing, etc. | |
} | |
if(tuple.isHeartbeatTuple()){ | |
//what to do on heartbeats? e.g. health checks | |
} | |
console.log("Processing tuple, which has value: %s\n",JSON.stringify(tuple.values)); //tuple.values is an array of data | |
done(); // `done` must be called to ack (processing the tuple, not automatic. think of it like a "return" statement) | |
//return; //in case done() doesn't abort tuple processing at this point (in scenarios where you have branching logic) | |
}; | |
var bolt = new TestBolt(); | |
bolt.run(); |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#! /usr/bin/env node | |
var chai = require('chai'); | |
var expect = chai.expect; // we are using the "expect" style of Chai | |
var SUT = require('pathTo/yourBoltUnderTest.js'); | |
var storm = require("pathTo/storm.js"); //the official storm multilang node client/binding from Apache Storm project | |
var fs = require('fs'); | |
var path = require('path'); | |
var capturedStdout = ""; | |
describe('YourBoltUnderTest', function() { | |
it('does something', function() { | |
//unless one has better way of unit testing I/O of the bolt, we capture/redirect I/O to validate | |
//also looking for a way to mock any node.js module call within bolt to avoid external dependencies | |
var intercept = require("intercept-stdout"), capturedStdout = ""; | |
var unhook_intercept = intercept(function(txt) { capturedStdout += txt; }); | |
var YourBoltUnderTest = new SUT.YourBoltUnderTestBolt(); | |
var cfg = {}, context = {}, done = function() { return; }; | |
YourBoltUnderTest.initialize(cfg,context,done); | |
var input = fs.readFileSync(path.resolve(__dirname, "input/data.json"),{'encoding':'utf8'}); | |
var tup = new storm.Tuple(1,"default","default",1,["data-field1",input]); | |
YourBoltUnderTest.process(tup,done); //repeat this type of call to simulate processing multiple messages from stream | |
unhook_intercept(); | |
//strip out the "end" string from storm output/messages | |
var output = capturedStdout.replace(new RegExp('end', 'g'),'').trim().split("\n"); | |
//in this example, we always emit as the last action per process event in the bolt, so it would be the last line of output | |
var result = JSON.parse(output[output.length-1]); | |
result = JSON.parse(result.tuple[0]); //we only have 1 output tuple in example, could be more | |
expect(result.someFieldOfOutputTupleDataDefinedInYourBolt).to.equal("your expected value"); | |
}); | |
it('properly pulls in storm topology configuration', function() { | |
var YourBoltUnderTest = new SUT.YourBoltUnderTestBolt(); | |
var cfg = {}, context = {}, done = function() { return; }; | |
cfg['configurationParameterName'] = 123; | |
YourBoltUnderTest.initialize(cfg,context,done); | |
//validate defined topology config that overrides defaults | |
expect(YourBoltUnderTest.configurationParameterName).to.equal(123); | |
//validate default config in bolt when not overridden with topology config | |
expect(YourBoltUnderTest.configurationParameterName2).to.equal("abc"); | |
}); | |
}); |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment