public class DirectBoltExample implements IBasicBolt { | |
@Override | |
public void prepare(Map conf, TopologyContext context) { | |
} | |
@Override | |
public void execute(Tuple tuple, BasicOutputCollector collector) { | |
int out = tuple.getInteger(0); | |
// We will use this map to keep track of how many tuples we've sent to each | |
// task. After we finish emitting tuples, we will emit to each task how many tuples | |
// we sent to them on the direct stream | |
// | |
// You can, of course, keep this map as an instance variable of the bolt if you want to | |
// coordinate across multiple input tuples. This is how CoordinatedBolt works | |
Map<Integer, Integer> outCounts = new HashMap<Integer, Integer>(); | |
for(int i=0; i < out; i++) { | |
// collector.emit returns the task ids that the tuple was sent to | |
List<Integer> tasks = collector.emit(1, new Values(i)); | |
for(int task: tasks) { | |
Integer curr = outCounts.get(task); | |
if(curr==null) curr = 0; | |
outCounts.put(task, curr + 1); | |
} | |
} | |
for(int task: outCounts.keySet()) { | |
// tuples are emitted to a direct stream using the emitDirect method | |
// the first argument to emitDirect is the task id | |
collector.emitDirect(task, 2, new Values(outCounts.get(task))); | |
} | |
} | |
@Override | |
public void cleanup() { | |
} | |
@Override | |
public void declareOutputFields(OutputFieldsDeclarer declarer) { | |
declarer.declareStream(1, new Fields("output")); | |
// Streams need to be explicitly declared as direct by saying "true" | |
// in the second parameter here | |
declarer.declareStream(2, true, new Fields("count")); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment