Skip to content

Instantly share code, notes, and snippets.

@sjoerdmulder
Created September 8, 2012 20:31
Show Gist options
  • Save sjoerdmulder/3679478 to your computer and use it in GitHub Desktop.
Save sjoerdmulder/3679478 to your computer and use it in GitHub Desktop.
TridentTopology failing tuples
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import backtype.storm.utils.Utils;
import storm.trident.TridentTopology;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
public class TridentTest {
public static void main(String[] args) throws Exception {
Config conf = new Config();
conf.setMaxSpoutPending(10);
conf.setMessageTimeoutSecs(3);
TridentTopology topology = new TridentTopology();
BaseRichSpout spout = new BaseRichSpout() {
public SpoutOutputCollector collector;
private int MAX_PER_SEQUENCE = 693;
private int sleeping = 0;
private int index = 0;
private int sequence = 0;
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("test"));
}
@Override
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
this.collector = collector;
}
@Override
public void nextTuple() {
if(index >= MAX_PER_SEQUENCE) {
index = 0;
sequence++;
}
if (sleeping % 100 == 0) {
Utils.sleep(50);
} else {
collector.emit(new Values("this is some sentence"), sequence + "-" + index);
index++;
}
sleeping++;
}
@Override
public void ack(Object msgId) {
Integer sequence = getSequenceFromMsgId(msgId);
int count = updateValue(sequence, countingAck);
checkForCompleteness(sequence, count, true);
}
@Override
public void fail(Object msgId) {
Integer sequence = getSequenceFromMsgId(msgId);
int count = updateValue(sequence, countingFail);
checkForCompleteness(sequence, count, false);
}
Map<Integer, Integer> countingAck = new ConcurrentHashMap<Integer, Integer>();
Map<Integer, Integer> countingFail = new ConcurrentHashMap<Integer, Integer>();
private void checkForCompleteness(Integer sequence, int count, boolean ack) {
if(count == MAX_PER_SEQUENCE) {
if(ack) {
countingAck.remove(sequence);
System.out.println("Acking sequence: " + sequence);
if(!countingAck.isEmpty()) {
System.out.println("Some sequence didn't ack completly!! " + countingAck.toString());
}
} else {
countingFail.remove(sequence);
System.out.println("Failing sequence: " + sequence);
if(!countingFail.isEmpty()) {
System.out.println("Some sequence didn't fail completly!! " + countingFail.toString());
}
}
}
}
private Integer getSequenceFromMsgId(Object msgId) {
String[] result = ((String) msgId).split("-");
return Integer.parseInt(result[0]);
}
private int updateValue(Integer key, Map<Integer, Integer> map) {
int count = 0;
if(map.containsKey(key)) {
count = map.get(key);
}
count++;
map.put(key, count);
return count;
}
};
topology.newStream("stream", spout);
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("trident-test", conf, topology.build());
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment