Created
July 17, 2014 02:21
-
-
Save manku-timma/2909c721fc077483f0b7 to your computer and use it in GitHub Desktop.
Example storm program to read from stdin and do some computation (modification of earlier program)
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package storm.starter; | |
import backtype.storm.Config; | |
import backtype.storm.LocalCluster; | |
import backtype.storm.StormSubmitter; | |
import backtype.storm.task.OutputCollector; | |
import backtype.storm.task.TopologyContext; | |
import backtype.storm.testing.TestWordSpout; | |
import backtype.storm.topology.OutputFieldsDeclarer; | |
import backtype.storm.topology.TopologyBuilder; | |
import backtype.storm.topology.base.BaseRichBolt; | |
import backtype.storm.task.ShellBolt; | |
import backtype.storm.topology.IRichBolt; | |
import backtype.storm.topology.base.BaseRichSpout; | |
import backtype.storm.tuple.Fields; | |
import backtype.storm.tuple.Tuple; | |
import backtype.storm.tuple.Values; | |
import backtype.storm.utils.Utils; | |
import backtype.storm.spout.SpoutOutputCollector; | |
import java.util.Map; | |
import java.io.*; | |
public class BracketTopology { | |
public static class ReadFromStdinSpout extends BaseRichSpout { | |
SpoutOutputCollector _collector; | |
@Override | |
public void nextTuple() { | |
System.out.println("Enter some input: "); | |
BufferedReader stdin = new BufferedReader(new InputStreamReader(System.in)); | |
String line = null; | |
try { | |
line = stdin.readLine(); | |
} catch (IOException ioe) { | |
System.out.println("Error while reading from stdin"); | |
System.exit(1); | |
} | |
_collector.emit(new Values(line)); | |
} | |
@Override | |
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { | |
_collector = collector; | |
} | |
@Override public void ack(Object id) { } | |
@Override public void fail(Object id) { } | |
@Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("word")); } | |
} | |
public static class AddBracket extends BaseRichBolt { | |
OutputCollector _collector; | |
@Override | |
public void prepare(Map conf, TopologyContext context, OutputCollector collector) { | |
_collector = collector; | |
} | |
@Override | |
public void execute(Tuple tuple) { | |
_collector.emit(tuple, new Values("(" + tuple.getString(0) + ")")); | |
_collector.ack(tuple); | |
} | |
@Override | |
public void declareOutputFields(OutputFieldsDeclarer declarer) { | |
declarer.declare(new Fields("output")); | |
} | |
@Override | |
public Map<String, Object> getComponentConfiguration() { | |
return null; | |
} | |
} | |
public static void main(String[] args) throws Exception { | |
TopologyBuilder builder = new TopologyBuilder(); | |
builder.setSpout("words", new ReadFromStdinSpout(), 1); | |
builder.setBolt("addbracket", new AddBracket(), 5).shuffleGrouping("words"); | |
Config conf = new Config(); | |
conf.setDebug(true); | |
LocalCluster cluster = new LocalCluster(); | |
cluster.submitTopology("bracket", conf, builder.createTopology()); | |
Utils.sleep(100000); | |
cluster.killTopology("bracket"); | |
cluster.shutdown(); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment