Skip to content

Instantly share code, notes, and snippets.

@manku-timma
Created July 16, 2014 08:37
Show Gist options
  • Save manku-timma/1d0832c113f836e75a81 to your computer and use it in GitHub Desktop.
Save manku-timma/1d0832c113f836e75a81 to your computer and use it in GitHub Desktop.
Sample Storm code with a spout and bolt to add brackets to input words
package storm.starter;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.testing.TestWordSpout;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
import backtype.storm.utils.Utils;
import java.util.Map;
public class BracketTopology {
public static class AddBracket extends BaseRichBolt {
OutputCollector _collector;
@Override
public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
_collector = collector;
}
@Override
public void execute(Tuple tuple) {
_collector.emit(tuple, new Values("(" + tuple.getString(0) + ")"));
_collector.ack(tuple);
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("output"));
}
}
public static void main(String[] args) throws Exception {
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("words", new TestWordSpout(), 10);
builder.setBolt("addbracket", new AddBracket(), 5).shuffleGrouping("words");
Config conf = new Config();
conf.setDebug(true);
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("bracket", conf, builder.createTopology());
Utils.sleep(10000);
cluster.killTopology("bracket");
cluster.shutdown();
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment