Skip to content
Create a gist now

Instantly share code, notes, and snippets.

Embed URL


Subversion checkout URL

You can clone with
Download ZIP
Direct stream example
public class DirectBoltExample implements IBasicBolt {
public void prepare(Map conf, TopologyContext context) {
public void execute(Tuple tuple, BasicOutputCollector collector) {
int out = tuple.getInteger(0);
// We will use this map to keep track of how many tuples we've sent to each
// task. After we finish emitting tuples, we will emit to each task how many tuples
// we sent to them on the direct stream
// You can, of course, keep this map as an instance variable of the bolt if you want to
// coordinate across multiple input tuples. This is how CoordinatedBolt works
Map<Integer, Integer> outCounts = new HashMap<Integer, Integer>();
for(int i=0; i < out; i++) {
// collector.emit returns the task ids that the tuple was sent to
List<Integer> tasks = collector.emit(1, new Values(i));
for(int task: tasks) {
Integer curr = outCounts.get(task);
if(curr==null) curr = 0;
outCounts.put(task, curr + 1);
for(int task: outCounts.keySet()) {
// tuples are emitted to a direct stream using the emitDirect method
// the first argument to emitDirect is the task id
collector.emitDirect(task, 2, new Values(outCounts.get(task)));
public void cleanup() {
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declareStream(1, new Fields("output"));
// Streams need to be explicitly declared as direct by saying "true"
// in the second parameter here
declarer.declareStream(2, true, new Fields("count"));
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Something went wrong with that request. Please try again.