Skip to content

Instantly share code, notes, and snippets.

@smly
Created November 6, 2013 02:25
Show Gist options
  • Save smly/7329917 to your computer and use it in GitHub Desktop.
Save smly/7329917 to your computer and use it in GitHub Desktop.
DropoutTopology.java
package sandbox;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
import backtype.storm.utils.Utils;
import java.util.LinkedList;
import java.util.Map;
import java.util.Random;
public class DropoutTopology {
public static class Message {
String msg;
Integer msgId;
Message(String msg_, Integer msgId_) {
msg = msg_;
msgId = msgId_;
}
}
public static class TestSpout extends BaseRichSpout {
static final long serialVersionUID = 10109412L;
SpoutOutputCollector _collector;
LinkedList<Message> queue;
final String[] sentences = new String[]{
"the cow jumped over the moon",
"an apple a day keeps the doctor away",
"four score and seven years ago",
"snow white and the seven dwarfs",
"i am at two with nature" };
@Override
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
_collector = collector;
queue = new LinkedList<Message>();
Integer msgId = 0;
for (String sentence : sentences) {
queue.add(new Message(sentence, msgId));
msgId++;
}
}
@Override
public void nextTuple() {
if (! queue.isEmpty()) {
Message msg = queue.pop();
_collector.emit(new Values(msg.msg), msg.msgId);
Utils.sleep(100);
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declare) {
declare.declare(new Fields("sentence"));
}
@Override
public void ack(Object msgId_) {
Integer msgId = (Integer) msgId_;
System.out.println(">>> TestSpout.ack " + sentences[msgId.intValue()]);
}
@Override
public void fail(Object msgId_) {
Integer msgId = (Integer) msgId_;
System.out.println(">>> TestSpout.fail " + sentences[msgId.intValue()]);
queue.add(new Message(sentences[msgId.intValue()], msgId.intValue()));
}
}
public static class DropoutBolt extends BaseRichBolt {
static final long serialVersionUID = 10109411L;
OutputCollector _collector;
Random _rand;
@Override
public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
_collector = collector;
_rand = new Random();
}
@Override
public void execute(Tuple tuple) {
if (_rand.nextInt(10) < 5) {
_collector.emit(new Values(tuple.getString(0) + "!"));
System.out.println(">>> DropoutBolt: collector.ack");
_collector.ack(tuple);
} else {
System.out.println(">>> DropoutBolt: collector.fail");
_collector.fail(tuple);
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("exclamation"));
}
}
public static void main(String[] args) throws Exception {
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("sentence", new TestSpout(), 1);
builder.setBolt("exclamation", new DropoutBolt(), 1).shuffleGrouping("sentence");
Config conf = new Config();
conf.setDebug(true);
// timeout
conf.put(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS, 2);
conf.setNumAckers(2);
// run local cluster
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("DropoutTopology", conf, builder.createTopology());
// sleep 20 seconds
Utils.sleep(1000 * 200);
// kill topology
cluster.killTopology("DropoutTopology");
cluster.shutdown();
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment