Skip to content

Instantly share code, notes, and snippets.

@biskandar
Created July 12, 2017 12:03
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save biskandar/4c8f922cf6d6dd033409e886f9657615 to your computer and use it in GitHub Desktop.
Save biskandar/4c8f922cf6d6dd033409e886f9657615 to your computer and use it in GitHub Desktop.
package com.disney.storm;
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.trident.TridentState;
import org.apache.storm.trident.TridentTopology;
import org.apache.storm.trident.operation.BaseFunction;
import org.apache.storm.trident.operation.TridentCollector;
import org.apache.storm.trident.operation.builtin.Count;
import org.apache.storm.trident.spout.IBatchSpout;
import org.apache.storm.trident.testing.FixedBatchSpout;
import org.apache.storm.trident.testing.MemoryMapState;
import org.apache.storm.trident.tuple.TridentTuple;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;
public class TestTridentTopology2 {
@SuppressWarnings("serial")
public static class Split extends BaseFunction {
public void execute(TridentTuple tuple, TridentCollector collector) {
String sentence = tuple.getString(0);
for (String word : sentence.split(" ")) {
collector.emit(new Values(word));
}
}
}
public static FixedBatchSpout createFixedBatchSpout(int maxBatchSize) {
@SuppressWarnings("unchecked")
FixedBatchSpout spout = new FixedBatchSpout(new Fields("sentence"), maxBatchSize,
new Values("the cow jumped over the moon"), new Values("the man went to the store and bought some candy"),
new Values("four score and seven years ago"), new Values("how many apples can you eat"));
spout.setCycle(true);
return spout;
}
public static TridentState setupTridentState(TridentTopology tridentTopology, String spoutName,
IBatchSpout spoutObject) {
TridentState tridentState = tridentTopology.newStream(spoutName, spoutObject)
.each(new Fields("sentence"), new Split(), new Fields("word")).groupBy(new Fields("word"))
.persistentAggregate(new MemoryMapState.Factory(), new Count(), new Fields("count")).parallelismHint(6);
return tridentState;
}
public static void main(String[] args) {
System.out.println("----- begin");
LocalCluster cluster = null;
try {
int maxBatchSize = 3;
System.out.println("----- prepare fixed batch spout : maxBatchSize = " + maxBatchSize);
FixedBatchSpout fixedBatchSpout = createFixedBatchSpout(maxBatchSize);
System.out.println("----- create trident topology");
TridentTopology tridentTopology = new TridentTopology();
System.out.println("----- setup trident state");
TridentState tridentState = setupTridentState(tridentTopology, "fixedBatchSpout", fixedBatchSpout);
System.out.println("----- setup config : debug = false ");
Config config = new Config();
config.setDebug(false);
config.put(Config.TOPOLOGY_DEBUG, "false");
config.setMaxSpoutPending(20);
System.out.println("----- opening up cluster");
cluster = new LocalCluster();
String topologyName = "topology-name-1";
System.out.println("----- submit topology : topology-name = " + topologyName);
cluster.submitTopology(topologyName, config, tridentTopology.build());
for (int idx = 5; idx > 0; idx--) {
System.out.println("----- wait for couple seconds : counter = " + idx);
Thread.sleep(1000);
}
System.out.println("----- kill topology : topology-name = " + topologyName);
cluster.killTopology(topologyName);
} catch (Exception e) {
System.err.println("---- Failed to run , " + e);
} finally {
if (cluster != null) {
System.out.println("----- shutting down cluster");
cluster.shutdown();
}
}
System.out.println("----- end");
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment