public
Created — forked from nathanmarz/gist:1257861

Direct stream example

  • Download Gist
gistfile1.txt
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50
public class DirectBoltExample implements IBasicBolt {
 
@Override
public void prepare(Map conf, TopologyContext context) {
}
 
@Override
public void execute(Tuple tuple, BasicOutputCollector collector) {
int out = tuple.getInteger(0);
// We will use this map to keep track of how many tuples we've sent to each
// task. After we finish emitting tuples, we will emit to each task how many tuples
// we sent to them on the direct stream
//
// You can, of course, keep this map as an instance variable of the bolt if you want to
// coordinate across multiple input tuples. This is how CoordinatedBolt works
Map<Integer, Integer> outCounts = new HashMap<Integer, Integer>();
for(int i=0; i < out; i++) {
// collector.emit returns the task ids that the tuple was sent to
List<Integer> tasks = collector.emit(1, new Values(i));
for(int task: tasks) {
Integer curr = outCounts.get(task);
if(curr==null) curr = 0;
outCounts.put(task, curr + 1);
}
}
for(int task: outCounts.keySet()) {
// tuples are emitted to a direct stream using the emitDirect method
// the first argument to emitDirect is the task id
collector.emitDirect(task, 2, new Values(outCounts.get(task)));
}
}
 
@Override
public void cleanup() {
}
 
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declareStream(1, new Fields("output"));
// Streams need to be explicitly declared as direct by saying "true"
// in the second parameter here
declarer.declareStream(2, true, new Fields("count"));
}
}

Please sign in to comment on this gist.

Something went wrong with that request. Please try again.