Skip to content

Instantly share code, notes, and snippets.

@amedina
Created November 10, 2013 00:24
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save amedina/7391993 to your computer and use it in GitHub Desktop.
Save amedina/7391993 to your computer and use it in GitHub Desktop.
Skeleton of STORM Exp Rollup Job
package com.twitter.ads.batch.experimental.amedina.storm.exprolluprt;
import java.util.List;
import org.apache.thrift.TDeserializer;
import org.apache.thrift.protocol.TBinaryProtocol;
import com.twitter.ads.logging.AdEngagementLogEntry;
import com.twitter.ads.realtime.exprolluprt.ExpRollupRTEngKey;
import backtype.storm.spout.Scheme;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
public class AdEngagementScheme implements Scheme {
/**
* Thrift deserializer for AdEngagementLogEntry
* @param bytes
* @return
*/
public List<Object> deserialize(byte[] bytes) {
Values tuple = null;
try {
TDeserializer thriftDeserializer = new TDeserializer(new TBinaryProtocol.Factory());
AdEngagementLogEntry engagement = new AdEngagementLogEntry();
thriftDeserializer.deserialize(engagement, bytes);
ExpRollupRTEngKey engKey = ExpRollupRTUtil.createEngagementKey(engagement);
return new Values(engKey, engKey.reqId, engKey.clientId);
} catch (Exception e) {
System.err.println("Got exception while dealing with a message");
}
return tuple;
}
public Fields getOutputFields() {
return new Fields("engKey", "reqId", "clientId");
}
}
package com.twitter.ads.batch.experimental.amedina.storm.exprolluprt;
import java.util.List;
import org.apache.thrift.TDeserializer;
import org.apache.thrift.protocol.TBinaryProtocol;
import com.twitter.ads.eventstream.ImpressionCallbackEvent;
import com.twitter.ads.realtime.exprolluprt.ExpRollupRTImpCallBackKey;
import backtype.storm.spout.Scheme;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
public class AdImpressionScheme implements Scheme {
/**
* Thrift deserializer for ImpressionCallbackEvent
* @param bytes
* @return Tuple
*/
public List<Object> deserialize(byte[] bytes) {
Values tuple = null;
try {
TDeserializer thriftDeserializer = new TDeserializer(new TBinaryProtocol.Factory());
ImpressionCallbackEvent impression = new ImpressionCallbackEvent();
thriftDeserializer.deserialize(impression, bytes);
ExpRollupRTImpCallBackKey impKey =
ExpRollupRTUtil.createImpressionCallbackKey(impression);
tuple = new Values(impKey, impKey.reqId, impKey.clientId);
} catch (Exception e) {
System.err.println("Got exception while dealing with a message");
}
return tuple;
}
public Fields getOutputFields() {
return new Fields("impKey", "reqId", "clientId");
}
}
package com.twitter.ads.batch.experimental.amedina.storm.exprolluprt;
import java.util.List;
import org.apache.thrift.TDeserializer;
import org.apache.thrift.protocol.TBinaryProtocol;
import com.twitter.ads.logging.AdShardRequestLogEntry;
import com.twitter.ads.realtime.exprolluprt.ExpRollupRTReqKey;
import backtype.storm.spout.Scheme;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
public class AdRequestScheme implements Scheme {
/**
* Thrift deserializer for AdShardRequestLogEntry
* @param bytes
* @return Tuple
*/
public List<Object> deserialize(byte[] bytes) {
Values tuple = null;
try {
TDeserializer thriftDeserializer = new TDeserializer(new TBinaryProtocol.Factory());
AdShardRequestLogEntry request = new AdShardRequestLogEntry();
thriftDeserializer.deserialize(request, bytes);
ExpRollupRTReqKey reqKey =
ExpRollupRTUtil.createRequestKey(request.getAdRequestLogEntry());
tuple = new Values(reqKey, reqKey.reqId, reqKey.clientId);
} catch (Exception e) {
System.err.println("Got exception while dealing with a message");
}
return tuple;
}
public Fields getOutputFields() {
return new Fields("requestKey", "reqId", "clientId");
}
}
java_library(name = 'exprolluprttopologylib',
dependencies = [
pants('3rdparty/storm'),
pants('3rdparty/storm:storm-kafka'),
pants('3rdparty/storm:storm-kestrel'),
pants('3rdparty:thrift'),
pants('3rdparty:util-thrift'),
pants('src/thrift/com/twitter/ads/eventstream:eventstream'),
pants('src/thrift/com/twitter/ads/adserver:adserver_log'),
pants('src/thrift/com/twitter/ads/realtime/exprolluprt'),
pants('src/java/com/twitter/common/args'),
],
sources = globs("*.java"),
)
jvm_binary(name = 'exprolluprttopology',
main = 'com.twitter.storm.samples.exprolluprt.ExpRollupRTTopology',
dependencies = [
pants(':exprolluprttopologylib')
],
deploy_excludes = [
exclude('storm', 'storm'),
exclude('log4j', 'log4j'),
exclude('org.slf4j', 'slf4j-api'),
exclude('org.slf4j', 'slf4j-jdk14'),
exclude('org.slf4j', 'slf4j-log4j12'),
],
)
package com.twitter.ads.batch.experimental.amedina.storm.exprolluprt;
import java.net.UnknownHostException;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Random;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
// Enable this when deserializing
// import com.twitter.ads.logging.AdEngagementLogEntry;
import com.twitter.common.args.Arg;
import com.twitter.common.args.ArgScanner;
import com.twitter.common.args.CmdLine;
import backtype.storm.Config;
import backtype.storm.StormSubmitter;
import backtype.storm.generated.AlreadyAliveException;
import backtype.storm.generated.InvalidTopologyException;
import backtype.storm.spout.KestrelThriftSpout;
import backtype.storm.spout.SchemeAsMultiScheme;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.topology.base.BaseBasicBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import storm.kafka.KafkaConfig;
import storm.kafka.KafkaSpout;
import storm.kafka.SpoutConfig;
public final class ExpRollupRTTopology {
// Can't be instantiated
private ExpRollupRTTopology() {
}
@CmdLine(name = "topology_name", help = "Experiment Rollup Real-time")
public static final Arg<String> TOPOLOGY_NAME =
Arg.create(System.getProperty("user.name") + "_expRollupRT_STRM");
@CmdLine(name = "max_spout_pending", help = "Max number of unacked tuples from the spout")
public static final Arg<Integer> MAX_SPOUT_PENDING = Arg.create(10);
@CmdLine(name = "spout0_parallelism", help = "Parallelism hint for spout0")
public static final Arg<Integer> SPOUT0_PARALLELISM = Arg.create(10);
@CmdLine(name = "stdout_parallelism", help = "Parallelism hint for stdout bolt")
public static final Arg<Integer> STDOUT_PARALLELISM = Arg.create(10);
@CmdLine(name = "number_workers", help = "Number of workers in the topology")
public static final Arg<Integer> NUMBER_WORKERS = Arg.create(3);
@CmdLine(name = "number_ackers", help = "Number of ackers in the topology")
public static final Arg<Integer> NUMBER_ACKERS = Arg.create(1);
@CmdLine(name = "team_name", help = "Team which owns this topology")
public static final Arg<String> TEAM_NAME = Arg.create("storm");
@CmdLine(name = "team_email", help = "Team Email which owns this topology")
public static final Arg<String> TEAM_EMAIL = Arg.create("streaming-compute@twitter.com");
@CmdLine(name = "project_name", help = "Topology part of which project/feature work")
public static final Arg<String> PROJECT_NAME = Arg.create("MyProject");
private static final class Constants {
/** Zookeeper for discovering kafka brokers */
public static final String KAFKA_BROKER_ZK_HOSTPORT = "szookeeper.smf1.twitter.com:2181";
/** Path to kafka brokers in zookeeper */
public static final String KAFKA_BROKER_ZK_PATH = "/";
/** Zookeeper root */
public static final String KAFKA_CONSUMER_ZK_PATH = "/kafkaconsumer";
/** Zookeeper for discovering kafka brokers */
public static final List<String> KESTREL_HOST_LIST =
Arrays.asList("smf1-ajt-03-sr1.prod.twitter.com");
/** Path to kafka brokers in zookeeper */
public static final int KESTREL_PORT = 2229;
private Constants() {
}
}
private static class ProcessRequestsBolt extends BaseBasicBolt {
private static final Logger LOG = LoggerFactory.getLogger(ProcessRequestsBolt.class.getName());
private Random random = new Random();
@Override
public void prepare(Map conf, TopologyContext context) {
super.prepare(conf, context);
}
@Override
public void execute(Tuple tuple, BasicOutputCollector collector) {
//collector.emit(new Values(reqKey, reqKey.reqId, reqKey.clientId, 1L))
Float r = random.nextFloat();
if (r <= 0.1) {
System.out.println("***** Emitting request! ****");
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
//outputFieldsDeclarer.declare(new Fields("requestKey", "reqId", "clientId", "count"))
}
}
private static class ProcessImpresssionsBolt extends BaseBasicBolt {
private static final Logger
LOG = LoggerFactory.getLogger(ProcessImpresssionsBolt.class.getName());
private Random random = new Random();
@Override
public void prepare(Map conf, TopologyContext context) {
super.prepare(conf, context);
}
@Override
public void execute(Tuple tuple, BasicOutputCollector collector) {
Float r = random.nextFloat();
if (r <= 0.1) {
System.out.println("***** Emitting Impression! *****");
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
}
}
private static class ProcessEngagementsBolt extends BaseBasicBolt {
private Random random = new Random();
private static final Logger
LOG = LoggerFactory.getLogger(ProcessEngagementsBolt.class.getName());
@Override
public void prepare(Map conf, TopologyContext context) {
super.prepare(conf, context);
}
@Override
public void execute(Tuple tuple, BasicOutputCollector collector) {
Float r = random.nextFloat();
if (r <= 0.1) {
System.out.println("***** Emitting Engagement! *****");
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
}
}
private static KafkaSpout buildAdRequestsSpout(String topologyName) {
SpoutConfig spoutConfig = new SpoutConfig(new KafkaConfig.ZkHosts(
"szookeeper.local.twitter.com:2181", "/twitter/kafka/brokers"),
"adshard_requests",
"/kafkaconsumer",
topologyName + "-adshard_requests");
spoutConfig.scheme = new SchemeAsMultiScheme(new AdRequestScheme());
spoutConfig.forceStartOffsetTime(-1);
spoutConfig.zkPort = 2181;
return new KafkaSpout(spoutConfig);
}
private static KestrelThriftSpout buildAdImpressionsSpout(String topologyName) {
KestrelThriftSpout spout =
new KestrelThriftSpout(Constants.KESTREL_HOST_LIST, Constants.KESTREL_PORT,
"ad_impressions_callback", new AdImpressionScheme());
return spout;
}
private static KafkaSpout buildAdEngagementsSpout(String topologyName) {
SpoutConfig spoutConfig = new SpoutConfig(new KafkaConfig.ZkHosts(
"szookeeper.local.twitter.com:2181", "/twitter/kafka/brokers"),
"ad_engagements",
"/kafkaconsumer",
topologyName + "-ad_engagements");
spoutConfig.scheme = new SchemeAsMultiScheme(new AdEngagementScheme());
spoutConfig.forceStartOffsetTime(-1);
spoutConfig.zkPort = 2181;
return new KafkaSpout(spoutConfig);
}
/** Submits toplogy */
public static void main(String[] args) throws UnknownHostException,
AlreadyAliveException, InvalidTopologyException {
if (!new ArgScanner().parse(Arrays.asList(args))) {
throw new RuntimeException("Failed to parse arguments.");
}
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("requests",
buildAdRequestsSpout(TOPOLOGY_NAME.get()), SPOUT0_PARALLELISM.get());
builder.setBolt("processRequests",
new ProcessRequestsBolt(), STDOUT_PARALLELISM.get())
.fieldsGrouping("requests", new Fields("reqId", "clientId"));
builder.setSpout("impressions",
buildAdImpressionsSpout(TOPOLOGY_NAME.get()), SPOUT0_PARALLELISM.get());
builder.setBolt("processImpressions",
new ProcessImpresssionsBolt(), STDOUT_PARALLELISM.get())
.fieldsGrouping("impressions", new Fields("reqId", "clientId"));
builder.setSpout("engagements",
buildAdEngagementsSpout(TOPOLOGY_NAME.get()), SPOUT0_PARALLELISM.get());
builder.setBolt("processEngagements",
new ProcessEngagementsBolt(), STDOUT_PARALLELISM.get())
.fieldsGrouping("engagements", new Fields("reqId", "clientId"));
Config conf = new Config();
conf.setDebug(false);
conf.setNumWorkers(NUMBER_WORKERS.get());
conf.setMaxSpoutPending(MAX_SPOUT_PENDING.get());
conf.setNumAckers(NUMBER_ACKERS.get());
conf.setTeamEmail(TEAM_EMAIL.get());
conf.setTeamName(TEAM_NAME.get());
conf.setTopologyProjectName(PROJECT_NAME.get());
StormSubmitter.submitTopology(TOPOLOGY_NAME.get(), conf, builder.createTopology());
}
}
package com.twitter.ads.batch.experimental.amedina.storm.exprolluprt;
import com.twitter.ads.eventstream.ImpressionCallbackEvent;
import com.twitter.ads.logging.AdEngagementLogEntry;
import com.twitter.ads.logging.AdRequestLogEntry;
import com.twitter.ads.realtime.exprolluprt.ExpRollupRTEngKey;
import com.twitter.ads.realtime.exprolluprt.ExpRollupRTImpCallBackKey;
import com.twitter.ads.realtime.exprolluprt.ExpRollupRTReqKey;
final class ExpRollupRTUtil {
private ExpRollupRTUtil() {
}
public static ExpRollupRTReqKey createRequestKey(AdRequestLogEntry adReq) {
ExpRollupRTReqKey reqKey = new ExpRollupRTReqKey();
reqKey.time = adReq.getTime();
reqKey.reqId = adReq.getRequestId();
reqKey.clientId = adReq.getClientInfo().getClientId();
reqKey.expIds = adReq.getExperimentKey().getExperimentIds();
reqKey.gender = adReq.getGender();
reqKey.countryRegion = adReq.getUserCountryRegion();
return reqKey;
}
public static ExpRollupRTImpCallBackKey
createImpressionCallbackKey(ImpressionCallbackEvent icbEvt) {
ExpRollupRTImpCallBackKey impKey = new ExpRollupRTImpCallBackKey();
impKey.time = icbEvt.getImpressionData().getImpressionEpochTimeMilliSec();
impKey.clientId = icbEvt.getImpressionData().getClientId();
impKey.reqId = icbEvt.getImpressionData().getRequestId();
impKey.impressionId = icbEvt.getImpressionData().getImpressionId();
impKey.dispLoc = icbEvt.getImpressionData().getDisplayLocation();
impKey.gender = icbEvt.getImpressionData().getGender();
impKey.expIds = icbEvt.getImpressionData().getExperimentKey().getExperimentIds();
impKey.countryRegion = icbEvt.getImpressionData().getUserCountryRegion();
return impKey;
}
public static ExpRollupRTEngKey createEngagementKey(AdEngagementLogEntry adEng) {
ExpRollupRTEngKey engKey = new ExpRollupRTEngKey();
engKey.time = adEng.getTime();
engKey.clientId = adEng.getClientInfo().getClientId();
engKey.engType = adEng.getEngagement().getType();
engKey.reqId = adEng.getRequestId();
engKey.impressionId = adEng.getImpressionId();
return engKey;
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment