Created
November 10, 2013 00:24
-
-
Save amedina/7391993 to your computer and use it in GitHub Desktop.
Skeleton of STORM Exp Rollup Job
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.twitter.ads.batch.experimental.amedina.storm.exprolluprt; | |
import java.util.List; | |
import org.apache.thrift.TDeserializer; | |
import org.apache.thrift.protocol.TBinaryProtocol; | |
import com.twitter.ads.logging.AdEngagementLogEntry; | |
import com.twitter.ads.realtime.exprolluprt.ExpRollupRTEngKey; | |
import backtype.storm.spout.Scheme; | |
import backtype.storm.tuple.Fields; | |
import backtype.storm.tuple.Values; | |
public class AdEngagementScheme implements Scheme { | |
/** | |
* Thrift deserializer for AdEngagementLogEntry | |
* @param bytes | |
* @return | |
*/ | |
public List<Object> deserialize(byte[] bytes) { | |
Values tuple = null; | |
try { | |
TDeserializer thriftDeserializer = new TDeserializer(new TBinaryProtocol.Factory()); | |
AdEngagementLogEntry engagement = new AdEngagementLogEntry(); | |
thriftDeserializer.deserialize(engagement, bytes); | |
ExpRollupRTEngKey engKey = ExpRollupRTUtil.createEngagementKey(engagement); | |
return new Values(engKey, engKey.reqId, engKey.clientId); | |
} catch (Exception e) { | |
System.err.println("Got exception while dealing with a message"); | |
} | |
return tuple; | |
} | |
public Fields getOutputFields() { | |
return new Fields("engKey", "reqId", "clientId"); | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.twitter.ads.batch.experimental.amedina.storm.exprolluprt; | |
import java.util.List; | |
import org.apache.thrift.TDeserializer; | |
import org.apache.thrift.protocol.TBinaryProtocol; | |
import com.twitter.ads.eventstream.ImpressionCallbackEvent; | |
import com.twitter.ads.realtime.exprolluprt.ExpRollupRTImpCallBackKey; | |
import backtype.storm.spout.Scheme; | |
import backtype.storm.tuple.Fields; | |
import backtype.storm.tuple.Values; | |
public class AdImpressionScheme implements Scheme { | |
/** | |
* Thrift deserializer for ImpressionCallbackEvent | |
* @param bytes | |
* @return Tuple | |
*/ | |
public List<Object> deserialize(byte[] bytes) { | |
Values tuple = null; | |
try { | |
TDeserializer thriftDeserializer = new TDeserializer(new TBinaryProtocol.Factory()); | |
ImpressionCallbackEvent impression = new ImpressionCallbackEvent(); | |
thriftDeserializer.deserialize(impression, bytes); | |
ExpRollupRTImpCallBackKey impKey = | |
ExpRollupRTUtil.createImpressionCallbackKey(impression); | |
tuple = new Values(impKey, impKey.reqId, impKey.clientId); | |
} catch (Exception e) { | |
System.err.println("Got exception while dealing with a message"); | |
} | |
return tuple; | |
} | |
public Fields getOutputFields() { | |
return new Fields("impKey", "reqId", "clientId"); | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.twitter.ads.batch.experimental.amedina.storm.exprolluprt; | |
import java.util.List; | |
import org.apache.thrift.TDeserializer; | |
import org.apache.thrift.protocol.TBinaryProtocol; | |
import com.twitter.ads.logging.AdShardRequestLogEntry; | |
import com.twitter.ads.realtime.exprolluprt.ExpRollupRTReqKey; | |
import backtype.storm.spout.Scheme; | |
import backtype.storm.tuple.Fields; | |
import backtype.storm.tuple.Values; | |
public class AdRequestScheme implements Scheme { | |
/** | |
* Thrift deserializer for AdShardRequestLogEntry | |
* @param bytes | |
* @return Tuple | |
*/ | |
public List<Object> deserialize(byte[] bytes) { | |
Values tuple = null; | |
try { | |
TDeserializer thriftDeserializer = new TDeserializer(new TBinaryProtocol.Factory()); | |
AdShardRequestLogEntry request = new AdShardRequestLogEntry(); | |
thriftDeserializer.deserialize(request, bytes); | |
ExpRollupRTReqKey reqKey = | |
ExpRollupRTUtil.createRequestKey(request.getAdRequestLogEntry()); | |
tuple = new Values(reqKey, reqKey.reqId, reqKey.clientId); | |
} catch (Exception e) { | |
System.err.println("Got exception while dealing with a message"); | |
} | |
return tuple; | |
} | |
public Fields getOutputFields() { | |
return new Fields("requestKey", "reqId", "clientId"); | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
java_library(name = 'exprolluprttopologylib', | |
dependencies = [ | |
pants('3rdparty/storm'), | |
pants('3rdparty/storm:storm-kafka'), | |
pants('3rdparty/storm:storm-kestrel'), | |
pants('3rdparty:thrift'), | |
pants('3rdparty:util-thrift'), | |
pants('src/thrift/com/twitter/ads/eventstream:eventstream'), | |
pants('src/thrift/com/twitter/ads/adserver:adserver_log'), | |
pants('src/thrift/com/twitter/ads/realtime/exprolluprt'), | |
pants('src/java/com/twitter/common/args'), | |
], | |
sources = globs("*.java"), | |
) | |
jvm_binary(name = 'exprolluprttopology', | |
main = 'com.twitter.storm.samples.exprolluprt.ExpRollupRTTopology', | |
dependencies = [ | |
pants(':exprolluprttopologylib') | |
], | |
deploy_excludes = [ | |
exclude('storm', 'storm'), | |
exclude('log4j', 'log4j'), | |
exclude('org.slf4j', 'slf4j-api'), | |
exclude('org.slf4j', 'slf4j-jdk14'), | |
exclude('org.slf4j', 'slf4j-log4j12'), | |
], | |
) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.twitter.ads.batch.experimental.amedina.storm.exprolluprt; | |
import java.net.UnknownHostException; | |
import java.util.Arrays; | |
import java.util.List; | |
import java.util.Map; | |
import java.util.Random; | |
import org.slf4j.Logger; | |
import org.slf4j.LoggerFactory; | |
// Enable this when deserializing | |
// import com.twitter.ads.logging.AdEngagementLogEntry; | |
import com.twitter.common.args.Arg; | |
import com.twitter.common.args.ArgScanner; | |
import com.twitter.common.args.CmdLine; | |
import backtype.storm.Config; | |
import backtype.storm.StormSubmitter; | |
import backtype.storm.generated.AlreadyAliveException; | |
import backtype.storm.generated.InvalidTopologyException; | |
import backtype.storm.spout.KestrelThriftSpout; | |
import backtype.storm.spout.SchemeAsMultiScheme; | |
import backtype.storm.task.TopologyContext; | |
import backtype.storm.topology.BasicOutputCollector; | |
import backtype.storm.topology.OutputFieldsDeclarer; | |
import backtype.storm.topology.TopologyBuilder; | |
import backtype.storm.topology.base.BaseBasicBolt; | |
import backtype.storm.tuple.Fields; | |
import backtype.storm.tuple.Tuple; | |
import storm.kafka.KafkaConfig; | |
import storm.kafka.KafkaSpout; | |
import storm.kafka.SpoutConfig; | |
public final class ExpRollupRTTopology { | |
// Can't be instantiated | |
private ExpRollupRTTopology() { | |
} | |
@CmdLine(name = "topology_name", help = "Experiment Rollup Real-time") | |
public static final Arg<String> TOPOLOGY_NAME = | |
Arg.create(System.getProperty("user.name") + "_expRollupRT_STRM"); | |
@CmdLine(name = "max_spout_pending", help = "Max number of unacked tuples from the spout") | |
public static final Arg<Integer> MAX_SPOUT_PENDING = Arg.create(10); | |
@CmdLine(name = "spout0_parallelism", help = "Parallelism hint for spout0") | |
public static final Arg<Integer> SPOUT0_PARALLELISM = Arg.create(10); | |
@CmdLine(name = "stdout_parallelism", help = "Parallelism hint for stdout bolt") | |
public static final Arg<Integer> STDOUT_PARALLELISM = Arg.create(10); | |
@CmdLine(name = "number_workers", help = "Number of workers in the topology") | |
public static final Arg<Integer> NUMBER_WORKERS = Arg.create(3); | |
@CmdLine(name = "number_ackers", help = "Number of ackers in the topology") | |
public static final Arg<Integer> NUMBER_ACKERS = Arg.create(1); | |
@CmdLine(name = "team_name", help = "Team which owns this topology") | |
public static final Arg<String> TEAM_NAME = Arg.create("storm"); | |
@CmdLine(name = "team_email", help = "Team Email which owns this topology") | |
public static final Arg<String> TEAM_EMAIL = Arg.create("streaming-compute@twitter.com"); | |
@CmdLine(name = "project_name", help = "Topology part of which project/feature work") | |
public static final Arg<String> PROJECT_NAME = Arg.create("MyProject"); | |
private static final class Constants { | |
/** Zookeeper for discovering kafka brokers */ | |
public static final String KAFKA_BROKER_ZK_HOSTPORT = "szookeeper.smf1.twitter.com:2181"; | |
/** Path to kafka brokers in zookeeper */ | |
public static final String KAFKA_BROKER_ZK_PATH = "/"; | |
/** Zookeeper root */ | |
public static final String KAFKA_CONSUMER_ZK_PATH = "/kafkaconsumer"; | |
/** Zookeeper for discovering kafka brokers */ | |
public static final List<String> KESTREL_HOST_LIST = | |
Arrays.asList("smf1-ajt-03-sr1.prod.twitter.com"); | |
/** Path to kafka brokers in zookeeper */ | |
public static final int KESTREL_PORT = 2229; | |
private Constants() { | |
} | |
} | |
private static class ProcessRequestsBolt extends BaseBasicBolt { | |
private static final Logger LOG = LoggerFactory.getLogger(ProcessRequestsBolt.class.getName()); | |
private Random random = new Random(); | |
@Override | |
public void prepare(Map conf, TopologyContext context) { | |
super.prepare(conf, context); | |
} | |
@Override | |
public void execute(Tuple tuple, BasicOutputCollector collector) { | |
//collector.emit(new Values(reqKey, reqKey.reqId, reqKey.clientId, 1L)) | |
Float r = random.nextFloat(); | |
if (r <= 0.1) { | |
System.out.println("***** Emitting request! ****"); | |
} | |
} | |
@Override | |
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { | |
//outputFieldsDeclarer.declare(new Fields("requestKey", "reqId", "clientId", "count")) | |
} | |
} | |
private static class ProcessImpresssionsBolt extends BaseBasicBolt { | |
private static final Logger | |
LOG = LoggerFactory.getLogger(ProcessImpresssionsBolt.class.getName()); | |
private Random random = new Random(); | |
@Override | |
public void prepare(Map conf, TopologyContext context) { | |
super.prepare(conf, context); | |
} | |
@Override | |
public void execute(Tuple tuple, BasicOutputCollector collector) { | |
Float r = random.nextFloat(); | |
if (r <= 0.1) { | |
System.out.println("***** Emitting Impression! *****"); | |
} | |
} | |
@Override | |
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { | |
} | |
} | |
private static class ProcessEngagementsBolt extends BaseBasicBolt { | |
private Random random = new Random(); | |
private static final Logger | |
LOG = LoggerFactory.getLogger(ProcessEngagementsBolt.class.getName()); | |
@Override | |
public void prepare(Map conf, TopologyContext context) { | |
super.prepare(conf, context); | |
} | |
@Override | |
public void execute(Tuple tuple, BasicOutputCollector collector) { | |
Float r = random.nextFloat(); | |
if (r <= 0.1) { | |
System.out.println("***** Emitting Engagement! *****"); | |
} | |
} | |
@Override | |
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { | |
} | |
} | |
private static KafkaSpout buildAdRequestsSpout(String topologyName) { | |
SpoutConfig spoutConfig = new SpoutConfig(new KafkaConfig.ZkHosts( | |
"szookeeper.local.twitter.com:2181", "/twitter/kafka/brokers"), | |
"adshard_requests", | |
"/kafkaconsumer", | |
topologyName + "-adshard_requests"); | |
spoutConfig.scheme = new SchemeAsMultiScheme(new AdRequestScheme()); | |
spoutConfig.forceStartOffsetTime(-1); | |
spoutConfig.zkPort = 2181; | |
return new KafkaSpout(spoutConfig); | |
} | |
private static KestrelThriftSpout buildAdImpressionsSpout(String topologyName) { | |
KestrelThriftSpout spout = | |
new KestrelThriftSpout(Constants.KESTREL_HOST_LIST, Constants.KESTREL_PORT, | |
"ad_impressions_callback", new AdImpressionScheme()); | |
return spout; | |
} | |
private static KafkaSpout buildAdEngagementsSpout(String topologyName) { | |
SpoutConfig spoutConfig = new SpoutConfig(new KafkaConfig.ZkHosts( | |
"szookeeper.local.twitter.com:2181", "/twitter/kafka/brokers"), | |
"ad_engagements", | |
"/kafkaconsumer", | |
topologyName + "-ad_engagements"); | |
spoutConfig.scheme = new SchemeAsMultiScheme(new AdEngagementScheme()); | |
spoutConfig.forceStartOffsetTime(-1); | |
spoutConfig.zkPort = 2181; | |
return new KafkaSpout(spoutConfig); | |
} | |
/** Submits toplogy */ | |
public static void main(String[] args) throws UnknownHostException, | |
AlreadyAliveException, InvalidTopologyException { | |
if (!new ArgScanner().parse(Arrays.asList(args))) { | |
throw new RuntimeException("Failed to parse arguments."); | |
} | |
TopologyBuilder builder = new TopologyBuilder(); | |
builder.setSpout("requests", | |
buildAdRequestsSpout(TOPOLOGY_NAME.get()), SPOUT0_PARALLELISM.get()); | |
builder.setBolt("processRequests", | |
new ProcessRequestsBolt(), STDOUT_PARALLELISM.get()) | |
.fieldsGrouping("requests", new Fields("reqId", "clientId")); | |
builder.setSpout("impressions", | |
buildAdImpressionsSpout(TOPOLOGY_NAME.get()), SPOUT0_PARALLELISM.get()); | |
builder.setBolt("processImpressions", | |
new ProcessImpresssionsBolt(), STDOUT_PARALLELISM.get()) | |
.fieldsGrouping("impressions", new Fields("reqId", "clientId")); | |
builder.setSpout("engagements", | |
buildAdEngagementsSpout(TOPOLOGY_NAME.get()), SPOUT0_PARALLELISM.get()); | |
builder.setBolt("processEngagements", | |
new ProcessEngagementsBolt(), STDOUT_PARALLELISM.get()) | |
.fieldsGrouping("engagements", new Fields("reqId", "clientId")); | |
Config conf = new Config(); | |
conf.setDebug(false); | |
conf.setNumWorkers(NUMBER_WORKERS.get()); | |
conf.setMaxSpoutPending(MAX_SPOUT_PENDING.get()); | |
conf.setNumAckers(NUMBER_ACKERS.get()); | |
conf.setTeamEmail(TEAM_EMAIL.get()); | |
conf.setTeamName(TEAM_NAME.get()); | |
conf.setTopologyProjectName(PROJECT_NAME.get()); | |
StormSubmitter.submitTopology(TOPOLOGY_NAME.get(), conf, builder.createTopology()); | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.twitter.ads.batch.experimental.amedina.storm.exprolluprt; | |
import com.twitter.ads.eventstream.ImpressionCallbackEvent; | |
import com.twitter.ads.logging.AdEngagementLogEntry; | |
import com.twitter.ads.logging.AdRequestLogEntry; | |
import com.twitter.ads.realtime.exprolluprt.ExpRollupRTEngKey; | |
import com.twitter.ads.realtime.exprolluprt.ExpRollupRTImpCallBackKey; | |
import com.twitter.ads.realtime.exprolluprt.ExpRollupRTReqKey; | |
final class ExpRollupRTUtil { | |
private ExpRollupRTUtil() { | |
} | |
public static ExpRollupRTReqKey createRequestKey(AdRequestLogEntry adReq) { | |
ExpRollupRTReqKey reqKey = new ExpRollupRTReqKey(); | |
reqKey.time = adReq.getTime(); | |
reqKey.reqId = adReq.getRequestId(); | |
reqKey.clientId = adReq.getClientInfo().getClientId(); | |
reqKey.expIds = adReq.getExperimentKey().getExperimentIds(); | |
reqKey.gender = adReq.getGender(); | |
reqKey.countryRegion = adReq.getUserCountryRegion(); | |
return reqKey; | |
} | |
public static ExpRollupRTImpCallBackKey | |
createImpressionCallbackKey(ImpressionCallbackEvent icbEvt) { | |
ExpRollupRTImpCallBackKey impKey = new ExpRollupRTImpCallBackKey(); | |
impKey.time = icbEvt.getImpressionData().getImpressionEpochTimeMilliSec(); | |
impKey.clientId = icbEvt.getImpressionData().getClientId(); | |
impKey.reqId = icbEvt.getImpressionData().getRequestId(); | |
impKey.impressionId = icbEvt.getImpressionData().getImpressionId(); | |
impKey.dispLoc = icbEvt.getImpressionData().getDisplayLocation(); | |
impKey.gender = icbEvt.getImpressionData().getGender(); | |
impKey.expIds = icbEvt.getImpressionData().getExperimentKey().getExperimentIds(); | |
impKey.countryRegion = icbEvt.getImpressionData().getUserCountryRegion(); | |
return impKey; | |
} | |
public static ExpRollupRTEngKey createEngagementKey(AdEngagementLogEntry adEng) { | |
ExpRollupRTEngKey engKey = new ExpRollupRTEngKey(); | |
engKey.time = adEng.getTime(); | |
engKey.clientId = adEng.getClientInfo().getClientId(); | |
engKey.engType = adEng.getEngagement().getType(); | |
engKey.reqId = adEng.getRequestId(); | |
engKey.impressionId = adEng.getImpressionId(); | |
return engKey; | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment