Skip to content

Instantly share code, notes, and snippets.

@manku-timma
Created July 17, 2014 02:21
Show Gist options
  • Save manku-timma/2909c721fc077483f0b7 to your computer and use it in GitHub Desktop.
Save manku-timma/2909c721fc077483f0b7 to your computer and use it in GitHub Desktop.
Example storm program to read from stdin and do some computation (modification of earlier program)
package storm.starter;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.testing.TestWordSpout;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.task.ShellBolt;
import backtype.storm.topology.IRichBolt;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
import backtype.storm.utils.Utils;
import backtype.storm.spout.SpoutOutputCollector;
import java.util.Map;
import java.io.*;
public class BracketTopology {
public static class ReadFromStdinSpout extends BaseRichSpout {
SpoutOutputCollector _collector;
@Override
public void nextTuple() {
System.out.println("Enter some input: ");
BufferedReader stdin = new BufferedReader(new InputStreamReader(System.in));
String line = null;
try {
line = stdin.readLine();
} catch (IOException ioe) {
System.out.println("Error while reading from stdin");
System.exit(1);
}
_collector.emit(new Values(line));
}
@Override
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
_collector = collector;
}
@Override public void ack(Object id) { }
@Override public void fail(Object id) { }
@Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("word")); }
}
public static class AddBracket extends BaseRichBolt {
OutputCollector _collector;
@Override
public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
_collector = collector;
}
@Override
public void execute(Tuple tuple) {
_collector.emit(tuple, new Values("(" + tuple.getString(0) + ")"));
_collector.ack(tuple);
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("output"));
}
@Override
public Map<String, Object> getComponentConfiguration() {
return null;
}
}
public static void main(String[] args) throws Exception {
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("words", new ReadFromStdinSpout(), 1);
builder.setBolt("addbracket", new AddBracket(), 5).shuffleGrouping("words");
Config conf = new Config();
conf.setDebug(true);
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("bracket", conf, builder.createTopology());
Utils.sleep(100000);
cluster.killTopology("bracket");
cluster.shutdown();
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment