Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
Storm Bolt Branch Decision sample
package stormmon.sample;
import java.util.Map;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.testing.TestWordSpout;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
import backtype.storm.utils.Utils;
/**
* Stormの分岐確認テスト
*
* @author kimutansk
*/
public class DecisionTestTopology
{
/**
* Default Constructor
*/
public DecisionTestTopology()
{}
/**
* 受信した単語長に合わせてTupleの送付先を切り替えるBolt
*
* @author kimutansk
*/
public static class JudgeBolt extends BaseRichBolt
{
OutputCollector collector_;
int index_;
@Override
public void prepare(Map conf, TopologyContext context,
OutputCollector collector)
{
this.collector_ = collector;
this.index_ = context.getThisTaskIndex();
}
@Override
public void execute(Tuple tuple)
{
String word = tuple.getString(0);
if (word.length() <= 5)
{
System.out.println("JudgeBolt_" + this.index_ + "ToShortWord:"
+ word);
// 短い単語用のStreamに流す
this.collector_.emit("ShortWord", new Values(word));
}
else
{
System.out.println("JudgeBolt_" + this.index_ + "ToLongWord:"
+ word);
// 長い単語用のStreamに流す
this.collector_.emit("LongWord", new Values(word));
}
this.collector_.ack(tuple);
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer)
{
// 分岐用の2個のStremを定義
declarer.declareStream("ShortWord", new Fields("word"));
declarer.declareStream("LongWord", new Fields("word"));
}
}
/**
* 短い単語を出力するBolt
*
* @author kimutansk
*/
public static class ShortWordBolt extends BaseRichBolt
{
OutputCollector collector_;
int index_;
@Override
public void prepare(Map conf, TopologyContext context,
OutputCollector collector)
{
this.collector_ = collector;
this.index_ = context.getThisTaskIndex();
}
@Override
public void execute(Tuple tuple)
{
String word = tuple.getString(0);
System.out.println("ShortWordBolt_" + this.index_ + ":" + word);
this.collector_.ack(tuple);
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer)
{}
}
/**
* 長い単語を出力するBolt
*
* @author kimutansk
*/
public static class LongWordBolt extends BaseRichBolt
{
OutputCollector collector_;
int index_;
@Override
public void prepare(Map conf, TopologyContext context,
OutputCollector collector)
{
this.collector_ = collector;
this.index_ = context.getThisTaskIndex();
}
@Override
public void execute(Tuple tuple)
{
String word = tuple.getString(0);
System.out.println("LongWordBolt_" + this.index_ + ":" + word);
this.collector_.ack(tuple);
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer)
{}
}
public static void main(String[] args) throws Exception
{
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("WordSpout", new TestWordSpout(), 2);
builder.setBolt("JudgeBolt", new JudgeBolt(), 5).fieldsGrouping(
"WordSpout", new Fields("word"));
// ShortWordのStreamを読み込むよう定義
builder.setBolt("ShortWord", new ShortWordBolt(), 5).fieldsGrouping(
"JudgeBolt", "ShortWord", new Fields("word"));
// LongWordのStreamを読み込むよう定義
builder.setBolt("LongWord", new LongWordBolt(), 5).fieldsGrouping(
"JudgeBolt", "LongWord", new Fields("word"));
Config conf = new Config();
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("JudgeTest", conf, builder.createTopology());
Utils.sleep(10000000);
cluster.killTopology("JudgeTest");
cluster.shutdown();
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment