Skip to content

Instantly share code, notes, and snippets.

@elvanja
Last active December 23, 2015 04:08
Show Gist options
  • Save elvanja/6577832 to your computer and use it in GitHub Desktop.
Save elvanja/6577832 to your computer and use it in GitHub Desktop.
package org.testfu.storm;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.LocalDRPC;
import backtype.storm.StormSubmitter;
import backtype.storm.generated.StormTopology;
import backtype.storm.tuple.Fields;
import storm.trident.TridentState;
import storm.trident.TridentTopology;
import storm.trident.operation.builtin.FilterNull;
import storm.trident.operation.builtin.Sum;
import storm.trident.operation.builtin.TupleCollectionGet;
import storm.trident.testing.MemoryMapState;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
/**
* Creates a single stream with all metrics.
* State queries extract and group the needed data.
*/
public class CombinedTopology {
public static final String MSG_COUNT_FIELD_NAME = "msgCountTotal";
public static StormTopology buildTopology(LocalDRPC drpc) {
TridentTopology topology = new TridentTopology();
TridentState messageState = buildMessageState(topology);
buildAllQuery(drpc, topology, messageState);
buildByGatewaysQuery(drpc, topology, messageState);
buildByChannelsQuery(drpc, topology, messageState);
buildByStatusesQuery(drpc, topology, messageState);
return topology.build();
}
private static TridentState buildMessageState(TridentTopology topology) {
return topology.newStream("messageState", new RandomMessageSpout())
.groupBy(new Fields("statusId", "smsChannelId", "gatewayId"))
.persistentAggregate(new MemoryMapState.Factory(), new Fields("msgCount"), new Sum(), new Fields(MSG_COUNT_FIELD_NAME));
}
private static void buildAllQuery(LocalDRPC drpc, TridentTopology topology, TridentState messageState) {
buildQueryFor(drpc, topology, messageState, "total");
}
private static void buildByGatewaysQuery(LocalDRPC drpc, TridentTopology topology, TridentState messageState) {
buildQueryFor(drpc, topology, messageState, "byGateway", "gatewayId");
}
private static void buildByChannelsQuery(LocalDRPC drpc, TridentTopology topology, TridentState messageState) {
buildQueryFor(drpc, topology, messageState, "byChannel", "smsChannelId");
}
private static void buildByStatusesQuery(LocalDRPC drpc, TridentTopology topology, TridentState messageState) {
buildQueryFor(drpc, topology, messageState, "byStatus", "statusId");
}
private static void buildQueryFor(LocalDRPC drpc, TridentTopology topology, TridentState messageState, String name, String... groupBy) {
List<String> allFields = new ArrayList<String>(Arrays.asList(groupBy));
allFields.add(MSG_COUNT_FIELD_NAME);
topology.newDRPCStream(name, drpc)
.stateQuery(messageState, new TupleCollectionGet(), new Fields(allFields))
.each(new Fields(MSG_COUNT_FIELD_NAME), new FilterNull())
.groupBy(new Fields(groupBy))
.aggregate(new Fields(MSG_COUNT_FIELD_NAME), new Sum(), new Fields("total"))
;
}
public static void main(String[] args) throws Exception {
Config conf = new Config();
conf.setMaxSpoutPending(20);
if (args.length == 0) {
LocalDRPC drpc = new LocalDRPC();
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("combined", conf, buildTopology(drpc));
for (int i=0; i < 100; i++) {
// execute topology state query, passing in the name of the drpc stream
System.out.println("total: " + drpc.execute("total", ""));
System.out.println("all gateways: " + drpc.execute("byGateway", ""));
System.out.println("all channels: " + drpc.execute("byChannel", ""));
System.out.println("all statuses: " + drpc.execute("byStatus", ""));
Thread.sleep(1000);
}
}
else {
conf.setNumWorkers(3);
StormSubmitter.submitTopology(args[0], conf, buildTopology(null));
}
}
}
package org.testfu.storm;
import backtype.storm.Config;
import backtype.storm.task.TopologyContext;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import org.jgroups.util.Util;
import storm.trident.operation.TridentCollector;
import storm.trident.spout.IBatchSpout;
import java.util.Date;
import java.util.Map;
import java.util.Random;
/**
* When implemented as BaseRichSpout, it caused issues with Util.sleep, the data was emitted but not persisted.
* Not an issue when directly implementing IBatchSpout.
*/
public class RandomMessageSpout implements IBatchSpout {
public static final int MAX_MESSAGE_PARTS = 3;
Random randomGenerator;
@Override
public void open(Map map, TopologyContext topologyContext) {
this.randomGenerator = new Random();
}
@Override
public void emitBatch(long batchId, TridentCollector collector) {
Util.sleep(100);
collector.emit(new Values(
new Date(),
randomChannel(),
randomGateway(),
randomMessageParts(),
randomStatus()
));
}
@Override
public void ack(long batchId) {
}
@Override
public void close() {
}
@Override
public Map getComponentConfiguration() {
Config conf = new Config();
conf.setMaxTaskParallelism(1);
return conf;
}
@Override
public Fields getOutputFields() {
return new Fields(
"sentDateTime",
"smsChannelId",
"gatewayId",
"messageParts",
"statusId"
);
}
private int randomStatus() {
int[] statuses = new int[] { 1, 2, 3, 4, 5 };
return statuses[randomGenerator.nextInt(statuses.length)];
}
private int randomChannel() {
int[] channels = new int[] { 101, 102, 103, 104, 105 };
return channels[randomGenerator.nextInt(channels.length)];
}
private int randomGateway() {
int[] gateways = new int[] { 10, 20, 30, 40, 50 };
return gateways[randomGenerator.nextInt(gateways.length)];
}
private int randomMessageParts() {
return randomGenerator.nextInt(MAX_MESSAGE_PARTS) + 1;
}
}
package org.testfu.storm;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.LocalDRPC;
import backtype.storm.StormSubmitter;
import backtype.storm.generated.StormTopology;
import backtype.storm.tuple.Fields;
import storm.trident.TridentState;
import storm.trident.TridentTopology;
import storm.trident.operation.builtin.FilterNull;
import storm.trident.operation.builtin.Sum;
import storm.trident.operation.builtin.TupleCollectionGet;
import storm.trident.testing.MemoryMapState;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
/**
* Creates a separate stream and query state for each metric.
* Uses the same spout as the data source.
*/
public class SeparateTopology {
public static final String MSG_COUNT_FIELD_NAME = "msgCountTotal";
public static StormTopology buildTopology(LocalDRPC drpc) {
TridentTopology topology = new TridentTopology();
RandomMessageSpout randomMessageSpout = new RandomMessageSpout();
buildAllQuery(drpc, topology, randomMessageSpout);
buildByGatewayQuery(drpc, topology, randomMessageSpout);
buildByChannelQuery(drpc, topology, randomMessageSpout);
buildByStatusesQuery(drpc, topology, randomMessageSpout);
return topology.build();
}
private static void buildAllQuery(LocalDRPC drpc, TridentTopology topology, RandomMessageSpout randomMessageSpout) {
buildQueryFor(drpc, topology, randomMessageSpout, "total");
}
private static void buildByGatewayQuery(LocalDRPC drpc, TridentTopology topology, RandomMessageSpout randomMessageSpout) {
buildQueryFor(drpc, topology, randomMessageSpout, "byGateway", "gatewayId");
}
private static void buildByChannelQuery(LocalDRPC drpc, TridentTopology topology, RandomMessageSpout randomMessageSpout) {
buildQueryFor(drpc, topology, randomMessageSpout, "byChannel", "smsChannelId");
}
private static void buildByStatusesQuery(LocalDRPC drpc, TridentTopology topology, RandomMessageSpout randomMessageSpout) {
buildQueryFor(drpc, topology, randomMessageSpout, "byStatus", "statusId");
}
private static void buildQueryFor(LocalDRPC drpc, TridentTopology topology, RandomMessageSpout randomMessageSpout, String name, String... groupBy) {
TridentState queryState =
topology.newStream(name + "Stream", randomMessageSpout)
.groupBy(new Fields(groupBy))
.persistentAggregate(new MemoryMapState.Factory(), new Fields("msgCount"), new Sum(), new Fields(MSG_COUNT_FIELD_NAME))
;
List<String> allFields = new ArrayList<String>(Arrays.asList(groupBy));
allFields.add(MSG_COUNT_FIELD_NAME);
topology.newDRPCStream(name, drpc)
.stateQuery(queryState, new TupleCollectionGet(), new Fields(allFields))
.each(new Fields(MSG_COUNT_FIELD_NAME), new FilterNull())
.groupBy(new Fields(groupBy))
.aggregate(new Fields(MSG_COUNT_FIELD_NAME), new Sum(), new Fields("total"))
;
}
public static void main(String[] args) throws Exception {
Config conf = new Config();
conf.setMaxSpoutPending(20);
if (args.length == 0) {
LocalDRPC drpc = new LocalDRPC();
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("separate", conf, buildTopology(drpc));
for (int i = 0; i < 100; i++) {
// execute topology state query, passing in the name of the drpc stream
System.out.println("total: " + drpc.execute("total", ""));
System.out.println("all gateways: " + drpc.execute("byGateway", ""));
System.out.println("all channels: " + drpc.execute("byChannel", ""));
System.out.println("all statuses: " + drpc.execute("byStatus", ""));
Thread.sleep(1000);
}
}
else {
conf.setNumWorkers(3);
StormSubmitter.submitTopology(args[0], conf, buildTopology(null));
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment