Created
October 2, 2011 19:58
-
-
Save nathanmarz/1257861 to your computer and use it in GitHub Desktop.
Direct stream example
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public class DirectBoltExample implements IBasicBolt { | |
@Override | |
public void prepare(Map conf, TopologyContext context) { | |
} | |
@Override | |
public void execute(Tuple tuple, BasicOutputCollector collector) { | |
int out = tuple.getInteger(0); | |
// We will use this map to keep track of how many tuples we've sent to each | |
// task. After we finish emitting tuples, we will emit to each task how many tuples | |
// we sent to them on the direct stream | |
// | |
// You can, of course, keep this map as an instance variable of the bolt if you want to | |
// coordinate across multiple input tuples. This is how CoordinatedBolt works | |
Map<Integer, Integer> outCounts = new HashMap<Integer, Integer>(); | |
for(int i=0; i < out; i++) { | |
// collector.emit returns the task ids that the tuple was sent to | |
List<Integer> tasks = collector.emit(1, new Values(i)); | |
for(int task: tasks) { | |
Integer curr = outCounts.get(task); | |
if(curr==null) curr = 0; | |
outCounts.put(task, curr + 1); | |
} | |
} | |
for(int task: outCounts.keySet()) { | |
// tuples are emitted to a direct stream using the emitDirect method | |
// the first argument to emitDirect is the task id | |
collector.emitDirect(task, 2, new Values(outCounts.get(task))); | |
} | |
} | |
@Override | |
public void cleanup() { | |
} | |
@Override | |
public void declareOutputFields(OutputFieldsDeclarer declarer) { | |
declarer.declareStream(1, new Fields("output")); | |
// Streams need to be explicitly declared as direct by saying "true" | |
// in the second parameter here | |
declarer.declareStream(2, true, new Fields("count")); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment