Skip to content

Instantly share code, notes, and snippets.

@abh1nav
Created March 16, 2012 12:39
Show Gist options
  • Save abh1nav/2049902 to your computer and use it in GitHub Desktop.
Save abh1nav/2049902 to your computer and use it in GitHub Desktop.
A storm bolt (S3) that collects tuples from Bolts S1 and S2 and runs only when both tuples have been received.
package com.twitsprout.sproutscore.bolts;
import java.util.HashMap;
import java.util.Map;
import com.google.common.collect.Maps;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Tuple;
public class S3 implements IRichBolt {
/**
* To hold the tuples generated by S1 and S2
*/
private class InputCollector {
public Tuple s1 = null;
public Tuple s2 = null;
}
private OutputCollector collector;
private HashMap<String, InputCollector> inputs;
@SuppressWarnings("rawtypes")
@Override
public void prepare(Map stormConf, TopologyContext context,
OutputCollector collector) {
this.collector = collector;
this.inputs = Maps.newHashMap();
}
@Override
public void execute(Tuple input) {
String messageId = (String) input.getValueByField("messageId");
InputCollector ic = getOrCreateInputCollector(messageId);
switch(input.getSourceComponent()) {
case "BoltIDofS1":
ic.s1 = input;
break;
case "BoltIDofS2":
ic.s2 = input;
break;
}
if(ic.s1 != null && ic.s2 != null) {
/**
* Run your code for S3
* now that you have both tuples from S1 and S2
* for the given message Id
*/
// remember to remove this InputCollector object Map
this.inputs.remove(messageId);
}
this.collector.ack(input);
}
private InputCollector getOrCreateInputCollector(String messageId) {
InputCollector ic;
if(this.inputs.containsKey(messageId)) {
ic = this.inputs.get(messageId);
}
else {
ic = new InputCollector();
this.inputs.put(messageId, ic);
}
return ic;
}
@Override
public void cleanup() {}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) { }
}
@abh1nav
Copy link
Author

abh1nav commented Mar 16, 2012

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment