Skip to content

Instantly share code, notes, and snippets.

Created August 10, 2013 10:30
Show Gist options
  • Save anonymous/6199942 to your computer and use it in GitHub Desktop.
Save anonymous/6199942 to your computer and use it in GitHub Desktop.
학습 목적으로 storm-starter 의 WordCountTopology 를 직접 구현해 보았습니다 storm-starter 의 예제 코드가 잘 되어 있고 간단한 사용은 어렵지 않게 되어 있어서 금방 만들고 확인할 수 있었네요
package storm.starter;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
import backtype.storm.utils.Utils;
import java.util.HashMap;
import java.util.Map;
import java.util.Random;
public class DIYWordCountTopologyForLocalCluster {
public static class CountWordBolt extends BaseRichBolt {
OutputCollector _collector;
private Map<String, Integer> mapWordCount = new HashMap<String, Integer>();
@Override
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
_collector = collector;
}
@Override
public void execute(Tuple input) {
String word = input.getString(0);
int wordCount = increaseWordCount(word);
_collector.emit(input, new Values(word, wordCount));
_collector.ack(input);
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word", "count"));
}
private int increaseWordCount(String word) {
int wordCount = 0;
if (mapWordCount.containsKey(word))
wordCount = mapWordCount.get(word);
wordCount++;
mapWordCount.put(word, wordCount);
return wordCount;
}
}
public static class SplitBolt extends BaseRichBolt {
private OutputCollector _collector;
@Override
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
_collector = collector;
}
@Override
public void execute(Tuple input) {
String sentence = input.getString(0);
for (String word : sentence.split(" ")) {
word = word.replaceAll(",|\\.|:|\"", "");
_collector.emit(input, new Values(word));
}
_collector.ack(input);
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word"));
}
}
public static class RandomSentenceSpout extends BaseRichSpout {
private SpoutOutputCollector _collector;
private Random rand;
private String[] sentences = new String[]{
"Once when I was six years old I saw a magnificent picture in a book, called True Stories from Nature, about the primeval forest.",
"It was a picture of a boa constrictor in the act of swallowing an animal.",
"Here is a copy of the drawing.",
"In the book it said: \"Boa constrictors swallow their prey whole, without chewing it. After that they are not able to move, and they sleep through the six months that they need for digestion.\"",
"I pondered deeply, then, over the adventures of the jungle.",
"And after some work with a colored pencil I succeeded in making my first drawing.",
"My Drawing Number One."
};
@Override
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
_collector = collector;
rand = new Random();
}
@Override
public void nextTuple() {
String selectedSentence = sentences[rand.nextInt(sentences.length)];
_collector.emit(new Values(selectedSentence));
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("sentence"));
}
@Override
public void ack(Object msgId) {
}
@Override
public void fail(Object msgId) {
}
}
public static void main(String[] args) {
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("sentence", new RandomSentenceSpout(), 1);
builder.setBolt("split", new SplitBolt(), 20)
.shuffleGrouping("sentence");
builder.setBolt("count", new CountWordBolt(), 10)
.fieldsGrouping("split", new Fields("word"));
Config config = new Config();
config.setDebug(true);
// Local Cluster test
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("diy", config, builder.createTopology());
Utils.sleep(10000);
cluster.killTopology("diy");
cluster.shutdown();
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment