Skip to content

@stonegao /gist:1266312 forked from nathanmarz/gist:1257861
Created

Embed URL

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Direct stream example
public class DirectBoltExample implements IBasicBolt {
@Override
public void prepare(Map conf, TopologyContext context) {
}
@Override
public void execute(Tuple tuple, BasicOutputCollector collector) {
int out = tuple.getInteger(0);
// We will use this map to keep track of how many tuples we've sent to each
// task. After we finish emitting tuples, we will emit to each task how many tuples
// we sent to them on the direct stream
//
// You can, of course, keep this map as an instance variable of the bolt if you want to
// coordinate across multiple input tuples. This is how CoordinatedBolt works
Map<Integer, Integer> outCounts = new HashMap<Integer, Integer>();
for(int i=0; i < out; i++) {
// collector.emit returns the task ids that the tuple was sent to
List<Integer> tasks = collector.emit(1, new Values(i));
for(int task: tasks) {
Integer curr = outCounts.get(task);
if(curr==null) curr = 0;
outCounts.put(task, curr + 1);
}
}
for(int task: outCounts.keySet()) {
// tuples are emitted to a direct stream using the emitDirect method
// the first argument to emitDirect is the task id
collector.emitDirect(task, 2, new Values(outCounts.get(task)));
}
}
@Override
public void cleanup() {
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declareStream(1, new Fields("output"));
// Streams need to be explicitly declared as direct by saying "true"
// in the second parameter here
declarer.declareStream(2, true, new Fields("count"));
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Something went wrong with that request. Please try again.