Skip to content

Instantly share code, notes, and snippets.

@stonegao
Forked from nathanmarz/gist:1257861
Created October 6, 2011 02:02
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save stonegao/1266312 to your computer and use it in GitHub Desktop.
Save stonegao/1266312 to your computer and use it in GitHub Desktop.
Direct stream example
public class DirectBoltExample implements IBasicBolt {
@Override
public void prepare(Map conf, TopologyContext context) {
}
@Override
public void execute(Tuple tuple, BasicOutputCollector collector) {
int out = tuple.getInteger(0);
// We will use this map to keep track of how many tuples we've sent to each
// task. After we finish emitting tuples, we will emit to each task how many tuples
// we sent to them on the direct stream
//
// You can, of course, keep this map as an instance variable of the bolt if you want to
// coordinate across multiple input tuples. This is how CoordinatedBolt works
Map<Integer, Integer> outCounts = new HashMap<Integer, Integer>();
for(int i=0; i < out; i++) {
// collector.emit returns the task ids that the tuple was sent to
List<Integer> tasks = collector.emit(1, new Values(i));
for(int task: tasks) {
Integer curr = outCounts.get(task);
if(curr==null) curr = 0;
outCounts.put(task, curr + 1);
}
}
for(int task: outCounts.keySet()) {
// tuples are emitted to a direct stream using the emitDirect method
// the first argument to emitDirect is the task id
collector.emitDirect(task, 2, new Values(outCounts.get(task)));
}
}
@Override
public void cleanup() {
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declareStream(1, new Fields("output"));
// Streams need to be explicitly declared as direct by saying "true"
// in the second parameter here
declarer.declareStream(2, true, new Fields("count"));
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment