|
import java.util.Map; |
|
import org.apache.storm.Config; |
|
import org.apache.storm.ILocalCluster; |
|
import org.apache.storm.Testing; |
|
import org.apache.storm.generated.StormTopology; |
|
import org.apache.storm.spout.SpoutOutputCollector; |
|
import org.apache.storm.task.OutputCollector; |
|
import org.apache.storm.task.TopologyContext; |
|
import org.apache.storm.testing.CompleteTopologyParam; |
|
import org.apache.storm.testing.MkClusterParam; |
|
import org.apache.storm.testing.MockedSources; |
|
import org.apache.storm.testing.TestJob; |
|
import org.apache.storm.topology.BasicOutputCollector; |
|
import org.apache.storm.topology.OutputFieldsDeclarer; |
|
import org.apache.storm.topology.TopologyBuilder; |
|
import org.apache.storm.topology.base.BaseBasicBolt; |
|
import org.apache.storm.topology.base.BaseRichBolt; |
|
import org.apache.storm.topology.base.BaseRichSpout; |
|
import org.apache.storm.tuple.Fields; |
|
import org.apache.storm.tuple.Tuple; |
|
import org.apache.storm.tuple.Values; |
|
import java.util.Arrays; |
|
import java.util.List; |
|
import static junit.framework.Assert.*; |
|
import org.junit.Test; |
|
|
|
public class StormTestExample { |
|
private final static String EVENT = "event"; |
|
private final static String SPOUT_ID = "spout"; |
|
private final static String BOLT_ID = "parrot"; |
|
private final static List<String> COMPONENT_IDS = Arrays.asList(SPOUT_ID, BOLT_ID); |
|
|
|
@Test |
|
public void testBasicTopology() { |
|
final MkClusterParam mkClusterParam = new MkClusterParam(); |
|
mkClusterParam.setSupervisors(4); |
|
final Config daemonConf = new Config(); |
|
daemonConf.put(Config.STORM_LOCAL_MODE_ZMQ, false); |
|
mkClusterParam.setDaemonConf(daemonConf); |
|
|
|
Testing.withSimulatedTimeLocalCluster(mkClusterParam, new TestJob() { |
|
@Override |
|
public void run(ILocalCluster cluster) { |
|
TopologyBuilder builder = new TopologyBuilder(); |
|
builder.setSpout(SPOUT_ID, new TestSpout()); |
|
builder.setBolt(BOLT_ID, new BaseBasicParrotBolt()).shuffleGrouping(SPOUT_ID); |
|
StormTopology topology = builder.createTopology(); |
|
|
|
MockedSources mockedSources = new MockedSources(); |
|
mockedSources.addMockData(SPOUT_ID, |
|
new Values("nathan"), |
|
new Values("bob"), |
|
new Values("joey"), |
|
new Values("nathan")); |
|
|
|
final Config conf = new Config(); |
|
conf.setNumWorkers(2); |
|
|
|
final CompleteTopologyParam completeTopologyParam = new CompleteTopologyParam(); |
|
completeTopologyParam.setMockedSources(mockedSources); |
|
completeTopologyParam.setStormConf(conf); |
|
|
|
final Map result = Testing.completeTopology(cluster, topology, completeTopologyParam); |
|
|
|
final Values expected = new Values(new Values("nathan"), new Values("bob"), new Values("joey"), |
|
new Values("nathan")); |
|
|
|
for (String component : COMPONENT_IDS) { |
|
assertTrue("Error in " + component + " output", |
|
Testing.multiseteq(expected, Testing.readTuples(result, component))); |
|
} |
|
} |
|
}); |
|
} |
|
|
|
private static class TestSpout extends BaseRichSpout { |
|
@Override |
|
public void declareOutputFields(OutputFieldsDeclarer ofd) { |
|
ofd.declare(new Fields(EVENT)); |
|
} |
|
|
|
@Override |
|
public void open(Map map, TopologyContext tc, SpoutOutputCollector soc) { |
|
throw new UnsupportedOperationException(); // Must override, but not needed for test. |
|
} |
|
|
|
@Override |
|
public void nextTuple() { |
|
throw new UnsupportedOperationException(); // Must override, but not needed for test. |
|
} |
|
} |
|
|
|
private static class BaseBasicParrotBolt extends BaseBasicBolt { |
|
@Override |
|
public void declareOutputFields(OutputFieldsDeclarer ofd) { |
|
ofd.declare(new Fields(EVENT)); |
|
} |
|
|
|
@Override |
|
public void execute(Tuple tuple, BasicOutputCollector boc) { |
|
boc.emit(new Values(tuple.getValue(0))); |
|
} |
|
} |
|
|
|
private static class BaseRichParrotBolt extends BaseRichBolt { |
|
private OutputCollector oc; |
|
|
|
@Override |
|
public void declareOutputFields(OutputFieldsDeclarer ofd) { |
|
ofd.declare(new Fields(EVENT)); |
|
} |
|
|
|
@Override |
|
public void prepare(Map map, TopologyContext tc, OutputCollector oc) { |
|
this.oc = oc; |
|
} |
|
|
|
@Override |
|
public void execute(Tuple tuple) { |
|
oc.emit(new Values(tuple.getValue(0))); |
|
} |
|
|
|
@Override |
|
public final Map<String, Object> getComponentConfiguration() { |
|
Config config = new Config(); |
|
config.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, null); |
|
return config; |
|
} |
|
} |
|
} |
I m confused that how does the ILocalCluster get initialized?