Created
November 28, 2012 23:13
-
-
Save kimutansk/4165446 to your computer and use it in GitHub Desktop.
Storm Bolt Branch Decision sample
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package stormmon.sample; | |
import java.util.Map; | |
import backtype.storm.Config; | |
import backtype.storm.LocalCluster; | |
import backtype.storm.task.OutputCollector; | |
import backtype.storm.task.TopologyContext; | |
import backtype.storm.testing.TestWordSpout; | |
import backtype.storm.topology.OutputFieldsDeclarer; | |
import backtype.storm.topology.TopologyBuilder; | |
import backtype.storm.topology.base.BaseRichBolt; | |
import backtype.storm.tuple.Fields; | |
import backtype.storm.tuple.Tuple; | |
import backtype.storm.tuple.Values; | |
import backtype.storm.utils.Utils; | |
/** | |
* Stormの分岐確認テスト | |
* | |
* @author kimutansk | |
*/ | |
public class DecisionTestTopology | |
{ | |
/** | |
* Default Constructor | |
*/ | |
public DecisionTestTopology() | |
{} | |
/** | |
* 受信した単語長に合わせてTupleの送付先を切り替えるBolt | |
* | |
* @author kimutansk | |
*/ | |
public static class JudgeBolt extends BaseRichBolt | |
{ | |
OutputCollector collector_; | |
int index_; | |
@Override | |
public void prepare(Map conf, TopologyContext context, | |
OutputCollector collector) | |
{ | |
this.collector_ = collector; | |
this.index_ = context.getThisTaskIndex(); | |
} | |
@Override | |
public void execute(Tuple tuple) | |
{ | |
String word = tuple.getString(0); | |
if (word.length() <= 5) | |
{ | |
System.out.println("JudgeBolt_" + this.index_ + "ToShortWord:" | |
+ word); | |
// 短い単語用のStreamに流す | |
this.collector_.emit("ShortWord", new Values(word)); | |
} | |
else | |
{ | |
System.out.println("JudgeBolt_" + this.index_ + "ToLongWord:" | |
+ word); | |
// 長い単語用のStreamに流す | |
this.collector_.emit("LongWord", new Values(word)); | |
} | |
this.collector_.ack(tuple); | |
} | |
@Override | |
public void declareOutputFields(OutputFieldsDeclarer declarer) | |
{ | |
// 分岐用の2個のStremを定義 | |
declarer.declareStream("ShortWord", new Fields("word")); | |
declarer.declareStream("LongWord", new Fields("word")); | |
} | |
} | |
/** | |
* 短い単語を出力するBolt | |
* | |
* @author kimutansk | |
*/ | |
public static class ShortWordBolt extends BaseRichBolt | |
{ | |
OutputCollector collector_; | |
int index_; | |
@Override | |
public void prepare(Map conf, TopologyContext context, | |
OutputCollector collector) | |
{ | |
this.collector_ = collector; | |
this.index_ = context.getThisTaskIndex(); | |
} | |
@Override | |
public void execute(Tuple tuple) | |
{ | |
String word = tuple.getString(0); | |
System.out.println("ShortWordBolt_" + this.index_ + ":" + word); | |
this.collector_.ack(tuple); | |
} | |
@Override | |
public void declareOutputFields(OutputFieldsDeclarer declarer) | |
{} | |
} | |
/** | |
* 長い単語を出力するBolt | |
* | |
* @author kimutansk | |
*/ | |
public static class LongWordBolt extends BaseRichBolt | |
{ | |
OutputCollector collector_; | |
int index_; | |
@Override | |
public void prepare(Map conf, TopologyContext context, | |
OutputCollector collector) | |
{ | |
this.collector_ = collector; | |
this.index_ = context.getThisTaskIndex(); | |
} | |
@Override | |
public void execute(Tuple tuple) | |
{ | |
String word = tuple.getString(0); | |
System.out.println("LongWordBolt_" + this.index_ + ":" + word); | |
this.collector_.ack(tuple); | |
} | |
@Override | |
public void declareOutputFields(OutputFieldsDeclarer declarer) | |
{} | |
} | |
public static void main(String[] args) throws Exception | |
{ | |
TopologyBuilder builder = new TopologyBuilder(); | |
builder.setSpout("WordSpout", new TestWordSpout(), 2); | |
builder.setBolt("JudgeBolt", new JudgeBolt(), 5).fieldsGrouping( | |
"WordSpout", new Fields("word")); | |
// ShortWordのStreamを読み込むよう定義 | |
builder.setBolt("ShortWord", new ShortWordBolt(), 5).fieldsGrouping( | |
"JudgeBolt", "ShortWord", new Fields("word")); | |
// LongWordのStreamを読み込むよう定義 | |
builder.setBolt("LongWord", new LongWordBolt(), 5).fieldsGrouping( | |
"JudgeBolt", "LongWord", new Fields("word")); | |
Config conf = new Config(); | |
LocalCluster cluster = new LocalCluster(); | |
cluster.submitTopology("JudgeTest", conf, builder.createTopology()); | |
Utils.sleep(10000000); | |
cluster.killTopology("JudgeTest"); | |
cluster.shutdown(); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment