Skip to content

Instantly share code, notes, and snippets.

@amedina
Created November 18, 2013 18:02
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save amedina/7532402 to your computer and use it in GitHub Desktop.
Save amedina/7532402 to your computer and use it in GitHub Desktop.
Storm: Light xprollup RT
package com.twitter.ads.batch.experimental.amedina.storm.exprolluprt;
import java.util.List;
import org.apache.thrift.TDeserializer;
import org.apache.thrift.protocol.TBinaryProtocol;
import com.twitter.ads.logging.AdEngagementLogEntry;
import com.twitter.ads.realtime.exprolluprt.ExpRollupRTEngKey;
import backtype.storm.spout.Scheme;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
public class AdEngagementScheme implements Scheme {
/**
* Thrift deserializer for AdEngagementLogEntry
* @param bytes
* @return
*/
public List<Object> deserialize(byte[] bytes) {
Values tuple = null;
try {
TDeserializer thriftDeserializer = new TDeserializer(new TBinaryProtocol.Factory());
AdEngagementLogEntry engagement = new AdEngagementLogEntry();
thriftDeserializer.deserialize(engagement, bytes);
ExpRollupRTEngKey engKey = ExpRollupRTUtil.createEngagementKey(engagement);
return new Values(engKey);
} catch (Exception e) {
System.err.println("Got exception while dealing with a message");
}
return tuple;
}
public Fields getOutputFields() {
return new Fields("engKey");
}
}
package com.twitter.ads.batch.experimental.amedina.storm.exprolluprt;
import java.util.List;
import org.apache.thrift.TDeserializer;
import org.apache.thrift.protocol.TBinaryProtocol;
import com.twitter.ads.eventstream.ImpressionCallbackEvent;
import com.twitter.ads.realtime.exprolluprt.ExpRollupRTImpCallBackKey;
import backtype.storm.spout.Scheme;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
public class AdImpressionScheme implements Scheme {
/**
* Thrift deserializer for ImpressionCallbackEvent
* @param bytes
* @return Tuple
*/
public List<Object> deserialize(byte[] bytes) {
Values tuple = null;
try {
TDeserializer thriftDeserializer = new TDeserializer(new TBinaryProtocol.Factory());
ImpressionCallbackEvent impression = new ImpressionCallbackEvent();
thriftDeserializer.deserialize(impression, bytes);
ExpRollupRTImpCallBackKey impKey =
ExpRollupRTUtil.createImpressionCallbackKey(impression);
tuple = new Values(impKey);
} catch (Exception e) {
System.err.println("Got exception while dealing with a message");
}
return tuple;
}
public Fields getOutputFields() {
return new Fields("impKey");
}
}
package com.twitter.ads.batch.experimental.amedina.storm.exprolluprt;
import java.util.List;
import org.apache.thrift.TDeserializer;
import org.apache.thrift.protocol.TBinaryProtocol;
import com.twitter.ads.logging.AdShardRequestLogEntry;
import com.twitter.ads.realtime.exprolluprt.ExpRollupRTReqKey;
import backtype.storm.spout.Scheme;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
public class AdRequestScheme implements Scheme {
/**
* Thrift deserializer for AdShardRequestLogEntry
* @param bytes
* @return Tuple
*/
public List<Object> deserialize(byte[] bytes) {
Values tuple = null;
try {
TDeserializer thriftDeserializer = new TDeserializer(new TBinaryProtocol.Factory());
AdShardRequestLogEntry request = new AdShardRequestLogEntry();
thriftDeserializer.deserialize(request, bytes);
ExpRollupRTReqKey reqKey =
ExpRollupRTUtil.createRequestKey(request.getAdRequestLogEntry());
tuple = new Values(reqKey);
} catch (Exception e) {
System.err.println("Got exception while dealing with a message");
}
return tuple;
}
public Fields getOutputFields() {
return new Fields("reqKey");
}
}
java_library(name = 'exprolluprttopologylib',
dependencies = [
pants('3rdparty/storm'),
pants('3rdparty/storm:storm-kafka'),
pants('3rdparty/storm:storm-kestrel'),
pants('3rdparty:thrift'),
pants('3rdparty:util-thrift'),
pants('3rdparty:finagle-mysql'),
pants('src/thrift/com/twitter/ads/eventstream:eventstream'),
pants('src/thrift/com/twitter/ads/adserver:adserver_log'),
pants('src/thrift/com/twitter/ads/realtime/exprolluprt'),
pants('src/java/com/twitter/common/args'),
],
sources = globs("*.java"),
)
jvm_binary(name = 'exprolluprttopology',
main = 'com.twitter.storm.samples.exprolluprt.ExpRollupRTTopology',
dependencies = [
pants(':exprolluprttopologylib')
],
deploy_excludes = [
exclude('storm', 'storm'),
exclude('log4j', 'log4j'),
exclude('org.slf4j', 'slf4j-api'),
exclude('org.slf4j', 'slf4j-jdk14'),
exclude('org.slf4j', 'slf4j-log4j12'),
],
)
package com.twitter.ads.batch.experimental.amedina.storm.exprolluprt;
import java.net.UnknownHostException;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.Arrays;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Random;
import java.util.TimeZone;
import java.util.concurrent.ConcurrentHashMap;
import java.util.logging.Level;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
// Enable this when deserializing
import com.twitter.ads.realtime.exprolluprt.ExpRollupRTEngKey;
import com.twitter.ads.realtime.exprolluprt.ExpRollupRTImpCallBackKey;
import com.twitter.ads.realtime.exprolluprt.ExpRollupRTReqKey;
import com.twitter.common.args.Arg;
import com.twitter.common.args.ArgScanner;
import com.twitter.common.args.CmdLine;
import com.twitter.finagle.exp.mysql.Client;
import com.twitter.finagle.stats.NullStatsReceiver;
import backtype.storm.Config;
import backtype.storm.StormSubmitter;
import backtype.storm.generated.AlreadyAliveException;
import backtype.storm.generated.InvalidTopologyException;
import backtype.storm.spout.KestrelThriftSpout;
import backtype.storm.spout.SchemeAsMultiScheme;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.topology.base.BaseBasicBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import storm.kafka.KafkaConfig;
import storm.kafka.KafkaSpout;
import storm.kafka.SpoutConfig;
public final class ExpRollupRTTopology {
// Can't be instantiated
private ExpRollupRTTopology() {
}
private static int writeBatch = 100;
private static int writePeriod = 15 * 60 * 1000;
@CmdLine(name = "topology_name", help = "Experiment Rollup Real-time")
public static final Arg<String> TOPOLOGY_NAME = Arg.create("light_exprollup_rt");
//Arg.create(System.getProperty("user.name") + "_expRollupRT_STORM_GG");
@CmdLine(name = "max_spout_pending", help = "Max number of unacked tuples from the spout")
public static final Arg<Integer> MAX_SPOUT_PENDING = Arg.create(10);
@CmdLine(name = "spout0_parallelism", help = "Parallelism hint for spout0")
public static final Arg<Integer> SPOUT0_PARALLELISM = Arg.create(10);
@CmdLine(name = "stdout_parallelism", help = "Parallelism hint for stdout bolt")
public static final Arg<Integer> STDOUT_PARALLELISM = Arg.create(10);
@CmdLine(name = "number_workers", help = "Number of workers in the topology")
public static final Arg<Integer> NUMBER_WORKERS = Arg.create(3);
@CmdLine(name = "number_ackers", help = "Number of ackers in the topology")
public static final Arg<Integer> NUMBER_ACKERS = Arg.create(1);
@CmdLine(name = "team_name", help = "Team which owns this topology")
public static final Arg<String> TEAM_NAME = Arg.create("Ads");
@CmdLine(name = "team_email", help = "Team Email which owns this topology")
public static final Arg<String> TEAM_EMAIL = Arg.create("amedina@twitter.com");
@CmdLine(name = "project_name", help = "Topology part of which project/feature work")
public static final Arg<String> PROJECT_NAME = Arg.create("Experiments DashBoard RT");
private static final class Constants {
/** Zookeeper for discovering kafka brokers */
public static final String KAFKA_BROKER_ZK_HOSTPORT = "szookeeper.smf1.twitter.com:2181";
/** Path to kafka brokers in zookeeper */
public static final String KAFKA_BROKER_ZK_PATH = "/";
/** Zookeeper root */
public static final String KAFKA_CONSUMER_ZK_PATH = "/kafkaconsumer";
/** Zookeeper for discovering kafka brokers */
public static final List<String> KESTREL_HOST_LIST =
Arrays.asList(
"smf1-ajo-04-sr1.prod.twitter.com",
"smf1-ajt-03-sr1.prod.twitter.com",
"smf1-aju-03-sr1.prod.twitter.com",
"smf1-ajv-03-sr1.prod.twitter.com",
"smf1-ajw-04-sr1.prod.twitter.com",
"smf1-ajy-04-sr1.prod.twitter.com",
"smf1-ajz-04-sr1.prod.twitter.com",
"smf1-akp-04-sr1.prod.twitter.com",
"smf1-akq-04-sr1.prod.twitter.com",
"smf1-akr-04-sr1.prod.twitter.com",
"smf1-aks-04-sr1.prod.twitter.com",
"smf1-akt-04-sr1.prod.twitter.com",
"smf1-akv-04-sr1.prod.twitter.com",
"smf1-akw-04-sr1.prod.twitter.com",
"smf1-akx-03-sr1.prod.twitter.com",
"smf1-aky-04-sr1.prod.twitter.com",
"smf1-akz-04-sr1.prod.twitter.com",
"smf1-ala-03-sr1.prod.twitter.com",
"smf1-ala-04-sr1.prod.twitter.com",
"smf1-alb-03-sr1.prod.twitter.com",
"smf1-alc-03-sr1.prod.twitter.com",
"smf1-alc-04-sr1.prod.twitter.com",
"smf1-ald-03-sr1.prod.twitter.com",
"smf1-ald-04-sr1.prod.twitter.com",
"smf1-ale-01-sr1.prod.twitter.com",
"smf1-alf-30-sr1.prod.twitter.com",
"smf1-alg-03-sr1.prod.twitter.com",
"smf1-alg-04-sr1.prod.twitter.com",
"smf1-alh-03-sr1.prod.twitter.com",
"smf1-ali-15-sr1.prod.twitter.com",
"smf1-ali-30-sr1.prod.twitter.com",
"smf1-alk-04-sr1.prod.twitter.com",
"smf1-all-03-sr1.prod.twitter.com",
"smf1-alm-04-sr1.prod.twitter.com",
"smf1-aln-01-sr1.prod.twitter.com",
"smf1-alp-03-sr1.prod.twitter.com",
"smf1-alp-04-sr1.prod.twitter.com",
"smf1-alq-03-sr1.prod.twitter.com",
"smf1-alq-04-sr1.prod.twitter.com",
"smf1-alr-03-sr1.prod.twitter.com",
"smf1-alr-04-sr1.prod.twitter.com",
"smf1-als-03-sr1.prod.twitter.com",
"smf1-alt-30-sr1.prod.twitter.com",
"smf1-alu-01-sr1.prod.twitter.com",
"smf1-alv-15-sr1.prod.twitter.com",
"smf1-alw-01-sr1.prod.twitter.com",
"smf1-alx-30-sr1.prod.twitter.com",
"smf1-aly-03-sr1.prod.twitter.com",
"smf1-alz-01-sr1.prod.twitter.com",
"smf1-alz-15-sr1.prod.twitter.com",
"smf1-anm-12-sr1.prod.twitter.com",
"smf1-ano-11-sr1.prod.twitter.com",
"smf1-anp-12-sr1.prod.twitter.com",
"smf1-anq-38-sr1.prod.twitter.com",
"smf1-ans-13-sr1.prod.twitter.com",
"smf1-aws-24-sr1.prod.twitter.com",
"smf1-awu-11-sr1.prod.twitter.com",
"smf1-awv-38-sr1.prod.twitter.com",
"smf1-aww-38-sr1.prod.twitter.com",
"smf1-axh-24-sr1.prod.twitter.com",
"smf1-axi-40-sr1.prod.twitter.com",
"smf1-axk-09-sr1.prod.twitter.com",
"smf1-axt-14-sr1.prod.twitter.com",
"smf1-axu-20-sr1.prod.twitter.com",
"smf1-axv-12-sr1.prod.twitter.com",
"smf1-axv-21-sr1.prod.twitter.com",
"smf1-axv-30-sr1.prod.twitter.com",
"smf1-axv-35-sr1.prod.twitter.com"
);
/** Path to kafka brokers in zookeeper */
public static final int KESTREL_PORT = 2229;
private Constants() {
}
}
private static class ProcessRequestsBolt extends BaseBasicBolt {
private static final Logger LOG = LoggerFactory.getLogger(ProcessRequestsBolt.class.getName());
private Map<ExpRollupRTReqKey, Long> requestsMap
= new ConcurrentHashMap<ExpRollupRTReqKey, Long>();
private Long startTime = System.currentTimeMillis();
@Override
public void prepare(Map conf, TopologyContext context) {
super.prepare(conf, context);
}
@Override
public void execute(Tuple tuple, BasicOutputCollector collector) {
TimeZone utcZone = TimeZone.getTimeZone("UTC");
DateFormat dateFormat = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss");
dateFormat.setTimeZone(utcZone);
ExpRollupRTReqKey reqKey = (ExpRollupRTReqKey) (tuple.getValue(0));
Long count = requestsMap.containsKey(reqKey) ? requestsMap.get(reqKey) : 0L;
requestsMap.put(reqKey, count + 1);
Long currentTime = System.currentTimeMillis();
String dateString = dateFormat.format(new Date());
if (currentTime - startTime >= writePeriod) {
System.out.println("REQUESTS MAP SIZE: " + requestsMap.size());
if (requestsMap.size() > 0) {
writeRequestsMapToDB(requestsMap, dateString);
requestsMap.clear();
}
startTime = currentTime;
}
}
public void writeRequestsMapToDB(Map<ExpRollupRTReqKey, Long> requests,
String dateString) {
Random r = new Random();
Client client = Client.apply("db-rins-rw-master-001.smf1.twitter.com:3306",
"dev.amedina", "iUAawAzhW219rDNbloVQd", "revenue_insight", Level.OFF,
new NullStatsReceiver());
String multiInsertSQL = "INSERT IGNORE INTO exp_rollup_rt_requests (date, "
+ "client_id, geo, display_loc, exp_id, "
+ "count) VALUES ";
int mapSize = requests.size();
int writeCounter = 0;
int totalCounter = 0;
StringBuilder valueStr = new StringBuilder();
for (Entry<ExpRollupRTReqKey, Long> entry : requests.entrySet()) {
mapSize -= 1;
ExpRollupRTReqKey key = entry.getKey();
Long reqCount = entry.getValue();
List<Integer> expIds = key.expIds;
int clientId = key.clientId;
int displayLoc = key.displayLoc;
String geo = key.countryRegion;
for (Integer expId : expIds) {
if (writeCounter > 0) {
valueStr.append(",");
}
writeCounter += 1;
totalCounter += 1;
valueStr.append("('");
valueStr.append(dateString + "'," + clientId + ",'");
valueStr.append(geo + "'," + displayLoc);
valueStr.append("," + expId + "," + reqCount + ")");
if (writeCounter % writeBatch == 0 || mapSize == 0) {
System.out.println("B: " + r.nextLong() + " "
+ multiInsertSQL + valueStr.toString());
try {
client.query(multiInsertSQL + " " + valueStr);
} catch (Exception ex) {
ex.printStackTrace();
}
writeCounter = 0;
valueStr = new StringBuilder();
}
}
}
System.out.println("*** TOTAL REQUESTS INSERTION COUNTER: " + totalCounter);
}
@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
//outputFieldsDeclarer.declare(new Fields("requestKey", "reqId", "clientId", "count"))
}
}
private static class ProcessImpresssionsBolt extends BaseBasicBolt {
private static final Logger
LOG = LoggerFactory.getLogger(ProcessImpresssionsBolt.class.getName());
private Map<ExpRollupRTImpCallBackKey, Long> impressionsMap
= new HashMap<ExpRollupRTImpCallBackKey, Long>();
private Long startTime = System.currentTimeMillis();
@Override
public void prepare(Map conf, TopologyContext context) {
super.prepare(conf, context);
}
@Override
public void execute(Tuple tuple, BasicOutputCollector collector) {
ExpRollupRTImpCallBackKey impKey = (ExpRollupRTImpCallBackKey) (tuple.getValue(0));
Long count = impressionsMap.containsKey(impKey) ? impressionsMap.get(impKey) : 0L;
impressionsMap.put(impKey, count + 1);
TimeZone utcZone = TimeZone.getTimeZone("UTC");
DateFormat dateFormat = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss");
dateFormat.setTimeZone(utcZone);
String dateString = dateFormat.format(new Date());
Long currentTime = System.currentTimeMillis();
if (currentTime - startTime >= writePeriod) {
System.out.println("IMPRESSIONS MAP SIZE: " + impressionsMap.size());
if (impressionsMap.size() > 0) {
writeImpressionsMapToDB(impressionsMap, dateString);
impressionsMap.clear();
}
startTime = currentTime;
}
}
public void writeImpressionsMapToDB(Map<ExpRollupRTImpCallBackKey, Long> impressions,
String dateString) {
Client client = Client.apply("db-rins-rw-master-001.smf1.twitter.com:3306",
"dev.amedina", "iUAawAzhW219rDNbloVQd", "revenue_insight", Level.OFF,
new NullStatsReceiver());
String multiInsertSQL = "INSERT IGNORE INTO exp_rollup_rt_impressions (date, "
+ "client_id, geo, display_loc, exp_id, "
+ "count) VALUES ";
int mapSize = impressions.size();
int writeCounter = 0;
int totalCounter = 0;
StringBuilder valueStr = new StringBuilder();
for (Entry<ExpRollupRTImpCallBackKey, Long> entry : impressions.entrySet()) {
mapSize -= 1;
ExpRollupRTImpCallBackKey key = entry.getKey();
List<Integer> expIds = key.expIds;
int clientId = key.clientId;
int displayLoc = key.displayLoc;
String geo = key.countryRegion;
Long impCount = entry.getValue();
for (Integer expId : expIds) {
if (writeCounter > 0) {
valueStr.append(",");
}
writeCounter += 1;
totalCounter += 1;
valueStr.append("('");
valueStr.append(dateString + "'," + clientId + ",'");
valueStr.append(geo + "'," + displayLoc);
valueStr.append("," + expId + "," + impCount + ")");
if (writeCounter % writeBatch == 0 || mapSize == 0) {
System.out.println(multiInsertSQL + valueStr.toString());
try {
client.query(multiInsertSQL + " " + valueStr);
} catch (Exception ex) {
ex.printStackTrace();
}
writeCounter = 0;
valueStr = new StringBuilder();
}
}
}
System.out.println("**** TOTAL IMPRESSIONS INSERTION COUNTER: " + totalCounter);
}
@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
}
}
private static class ProcessEngagementsBolt extends BaseBasicBolt {
private Map<ExpRollupRTEngKey, Long> engagementsMap = new HashMap<ExpRollupRTEngKey, Long>();
private Long startTime = System.currentTimeMillis();
private static final Logger
LOG = LoggerFactory.getLogger(ProcessEngagementsBolt.class.getName());
@Override
public void prepare(Map conf, TopologyContext context) {
super.prepare(conf, context);
}
@Override
public void execute(Tuple tuple, BasicOutputCollector collector) {
ExpRollupRTEngKey engKey = (ExpRollupRTEngKey) (tuple.getValue(0));
Long count = engagementsMap.containsKey(engKey) ? engagementsMap.get(engKey) : 0L;
engagementsMap.put(engKey, count + 1);
TimeZone utcZone = TimeZone.getTimeZone("UTC");
DateFormat dateFormat = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss");
dateFormat.setTimeZone(utcZone);
String dateString = dateFormat.format(new Date());
Long currentTime = System.currentTimeMillis();
if (currentTime - startTime >= writePeriod) {
System.out.println("ENGAGEMENTS MAP SIZE: " + engagementsMap.size());
if (engagementsMap.size() > 0) {
writeEngagementsMapToDB(engagementsMap, dateString);
engagementsMap.clear();
}
startTime = currentTime;
}
}
public void writeEngagementsMapToDB(Map<ExpRollupRTEngKey, Long> engagements,
String dateString) {
Client client = Client.apply("db-rins-rw-master-001.smf1.twitter.com:3306",
"dev.amedina", "iUAawAzhW219rDNbloVQd", "revenue_insight", Level.OFF,
new NullStatsReceiver());
String multiInsertSQL = "INSERT IGNORE INTO exp_rollup_rt_engagements (date, "
+ "client_id, engagement_type, count) VALUES ";
int mapSize = engagements.size();
int writeCounter = 0;
int totalCounter = 0;
StringBuilder valueStr = new StringBuilder();
for (Entry<ExpRollupRTEngKey, Long> entry : engagements.entrySet()) {
mapSize -= 1;
ExpRollupRTEngKey key = entry.getKey();
int clientId = key.clientId;
int engType = key.engagementType;
Long engCount = entry.getValue();
if (writeCounter > 0) {
valueStr.append(",");
}
writeCounter += 1;
totalCounter += 1;
valueStr.append("('");
valueStr.append(dateString + "'," + clientId + ",");
valueStr.append(engType + "," + engCount + ")");
if (writeCounter % writeBatch == 0 || mapSize == 0) {
System.out.println(multiInsertSQL + valueStr.toString());
try {
client.query(multiInsertSQL + " " + valueStr);
} catch (Exception ex) {
ex.printStackTrace();
}
writeCounter = 0;
valueStr = new StringBuilder();
}
}
System.out.println("**** TOTAL ENGAGEMENTS INSERTION COUNTER: " + totalCounter);
}
@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
}
}
private static KafkaSpout buildAdRequestsSpout(String topologyName) {
SpoutConfig spoutConfig = new SpoutConfig(new KafkaConfig.ZkHosts(
"szookeeper.local.twitter.com:2181", "/twitter/kafka/brokers"),
"adshard_requests",
"/kafkaconsumer",
topologyName + "-adshard_requests");
spoutConfig.scheme = new SchemeAsMultiScheme(new AdRequestScheme());
spoutConfig.forceStartOffsetTime(-1);
spoutConfig.zkPort = 2181;
return new KafkaSpout(spoutConfig);
}
private static KestrelThriftSpout buildAdImpressionsSpout(String topologyName) {
KestrelThriftSpout spout =
new KestrelThriftSpout(Constants.KESTREL_HOST_LIST, Constants.KESTREL_PORT,
"ad_impressions_callback", new AdImpressionScheme());
return spout;
}
private static KafkaSpout buildAdEngagementsSpout(String topologyName) {
SpoutConfig spoutConfig = new SpoutConfig(new KafkaConfig.ZkHosts(
"szookeeper.local.twitter.com:2181", "/twitter/kafka/brokers"),
"ad_engagements",
"/kafkaconsumer",
topologyName + "-ad_engagements");
spoutConfig.scheme = new SchemeAsMultiScheme(new AdEngagementScheme());
spoutConfig.forceStartOffsetTime(-1);
spoutConfig.zkPort = 2181;
return new KafkaSpout(spoutConfig);
}
/** Submits toplogy */
public static void main(String[] args) throws UnknownHostException,
AlreadyAliveException, InvalidTopologyException {
if (!new ArgScanner().parse(Arrays.asList(args))) {
throw new RuntimeException("Failed to parse arguments.");
}
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("requests",
buildAdRequestsSpout(TOPOLOGY_NAME.get()), SPOUT0_PARALLELISM.get());
builder.setBolt("processRequests",
new ProcessRequestsBolt(), STDOUT_PARALLELISM.get())
.fieldsGrouping("requests", new Fields("reqKey"));
// builder.setSpout("impressions",
// buildAdImpressionsSpout(TOPOLOGY_NAME.get()), SPOUT0_PARALLELISM.get());
// builder.setBolt("processImpressions",
// new ProcessImpresssionsBolt(), STDOUT_PARALLELISM.get())
// .fieldsGrouping("impressions", new Fields("impKey"));
builder.setSpout("engagements",
buildAdEngagementsSpout(TOPOLOGY_NAME.get()), SPOUT0_PARALLELISM.get());
builder.setBolt("processEngagements",
new ProcessEngagementsBolt(), STDOUT_PARALLELISM.get())
.fieldsGrouping("engagements", new Fields("engKey"));
Config conf = new Config();
conf.setDebug(false);
conf.setNumWorkers(NUMBER_WORKERS.get());
conf.setMaxSpoutPending(MAX_SPOUT_PENDING.get());
conf.setNumAckers(NUMBER_ACKERS.get());
conf.setTeamEmail(TEAM_EMAIL.get());
conf.setTeamName(TEAM_NAME.get());
conf.setTopologyProjectName(PROJECT_NAME.get());
conf.setTopologyCapTicket("CAP-1586");
StormSubmitter.submitTopology(TOPOLOGY_NAME.get(), conf, builder.createTopology());
}
}
package com.twitter.ads.batch.experimental.amedina.storm.exprolluprt;
import java.io.Serializable;
import com.twitter.ads.eventstream.ImpressionCallbackEvent;
import com.twitter.ads.logging.AdEngagementLogEntry;
import com.twitter.ads.logging.AdRequestLogEntry;
import com.twitter.ads.realtime.exprolluprt.ExpRollupRTEngKey;
import com.twitter.ads.realtime.exprolluprt.ExpRollupRTImpCallBackKey;
import com.twitter.ads.realtime.exprolluprt.ExpRollupRTReqKey;
public final class ExpRollupRTUtil implements Serializable {
private ExpRollupRTUtil() {
}
/**
* Create Request key from AdRequest LogEntry
* @param adReq : AdRequestLogEntry
* @return ExpRollupRTReqKey
*/
public static ExpRollupRTReqKey createRequestKey(AdRequestLogEntry adReq) {
ExpRollupRTReqKey reqKey = new ExpRollupRTReqKey();
reqKey.clientId = adReq.getClientInfo().getClientId();
reqKey.expIds = adReq.getExperimentKey().getExperimentIds();
reqKey.countryRegion = adReq.getClientInfo().getCountryCode();
reqKey.displayLoc = adReq.getRequest().getDisplayLocation().getValue();
return reqKey;
}
/**
* Create Impression Call Back key from Impression Call Back Event
* @param icbEvt : ImpressionCallbackEvent
* @return ExpRollupRTImpCallBackKey
*/
public static ExpRollupRTImpCallBackKey
createImpressionCallbackKey(ImpressionCallbackEvent icbEvt) {
ExpRollupRTImpCallBackKey impKey = new ExpRollupRTImpCallBackKey();
impKey.clientId = icbEvt.getImpressionData().getClientId();
impKey.expIds = icbEvt.getImpressionData().getExperimentKey().getExperimentIds();
impKey.countryRegion = icbEvt.getClientInfo().getCountryCode();
impKey.displayLoc = icbEvt.getImpressionData().getDisplayLocation().getValue();
return impKey;
}
/**
* Create Engagement Key from AdEngagementLog Entry
* @param adEng : AdEngagementLogEntry
* @return ExpRollupRTEngKey
*/
public static ExpRollupRTEngKey createEngagementKey(AdEngagementLogEntry adEng) {
ExpRollupRTEngKey engKey = new ExpRollupRTEngKey();
engKey.clientId = adEng.getClientInfo().getClientId();
engKey.countryRegion = adEng.getClientInfo().getCountryCode();
engKey.engagementType = adEng.getEngagement().getType().getValue();
return engKey;
}
}
at com.twitter.chill.storm.BlizzardKryoFactory.getKryo(BlizzardKryoFactory.java:40) ~[stormjar.jar:na]
at backtype.storm.serialization.SerializationFactory.getKryo(SerializationFactory.java:33) ~[storm-storm-0.9.0-wip15-2.2.0.jar:na]
at backtype.storm.serialization.KryoValuesDeserializer.<init>(KryoValuesDeserializer.java:15) ~[storm-storm-0.9.0-wip15-2.2.0.jar:na]
at backtype.storm.serialization.KryoTupleDeserializer.<init>(KryoTupleDeserializer.java:22) ~[storm-storm-0.9.0-wip15-2.2.0.jar:na]
at backtype.storm.daemon.executor$mk_executor_data$fn__3420.invoke(executor.clj:200) ~[storm-storm-0.9.0-wip15-2.2.0.jar:na]
at backtype.storm.util$assoc_apply_self.invoke(util.clj:734) ~[storm-storm-0.9.0-wip15-2.2.0.jar:na]
at backtype.storm.daemon.executor$mk_executor_data.invoke(executor.clj:200) ~[storm-storm-0.9.0-wip15-2.2.0.jar:na]
at backtype.storm.daemon.executor$mk_executor.invoke(executor.clj:304) ~[storm-storm-0.9.0-wip15-2.2.0.jar:na]
at backtype.storm.daemon.worker$fn__4830$exec_fn__1194__auto____4831$iter__4836__4840$fn__4841.invoke(worker.clj:349) ~[storm-storm-0.9.0-wip15-2.2.0.jar:na]
at clojure.lang.LazySeq.sval(LazySeq.java:42) ~[org.clojure-clojure-1.4.0.jar:na]
at clojure.lang.LazySeq.seq(LazySeq.java:60) ~[org.clojure-clojure-1.4.0.jar:na]
at clojure.lang.RT.seq(RT.java:473) ~[org.clojure-clojure-1.4.0.jar:na]
at clojure.core$seq.invoke(core.clj:133) ~[org.clojure-clojure-1.4.0.jar:na]
at clojure.core$dorun.invoke(core.clj:2725) ~[org.clojure-clojure-1.4.0.jar:na]
at clojure.core$doall.invoke(core.clj:2741) ~[org.clojure-clojure-1.4.0.jar:na]
at backtype.storm.daemon.worker$fn__4830$exec_fn__1194__auto____4831.invoke(worker.clj:349) ~[storm-storm-0.9.0-wip15-2.2.0.jar:na]
at clojure.lang.AFn.applyToHelper(AFn.java:185) [org.clojure-clojure-1.4.0.jar:na]
at clojure.lang.AFn.applyTo(AFn.java:151) [org.clojure-clojure-1.4.0.jar:na]
at clojure.core$apply.invoke(core.clj:601) ~[org.clojure-clojure-1.4.0.jar:na]
at backtype.storm.daemon.worker$fn__4830$mk_worker__4886.doInvoke(worker.clj:318) [storm-storm-0.9.0-wip15-2.2.0.jar:na]
at clojure.lang.RestFn.invoke(RestFn.java:512) [org.clojure-clojure-1.4.0.jar:na]
at backtype.storm.daemon.worker$_main.invoke(worker.clj:428) [storm-storm-0.9.0-wip15-2.2.0.jar:na]
at clojure.lang.AFn.applyToHelper(AFn.java:172) [org.clojure-clojure-1.4.0.jar:na]
at clojure.lang.AFn.applyTo(AFn.java:151) [org.clojure-clojure-1.4.0.jar:na]
at backtype.storm.daemon.worker.main(Unknown Source) [storm-storm-0.9.0-wip15-2.2.0.jar:na]
Caused by: java.lang.IndexOutOfBoundsException: Index: 1, Size: 1
at java.util.ArrayList.rangeCheck(ArrayList.java:635) ~[na:1.7.0-u40-rel_20]
at java.util.ArrayList.get(ArrayList.java:411) ~[na:1.7.0-u40-rel_20]
at com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapReferenceResolver.java:42) ~[com.esotericsoftware.kryo-kryo-2.21.jar:na]
at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:773) ~[com.esotericsoftware.kryo-kryo-2.21.jar:na]
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:646) ~[com.esotericsoftware.kryo-kryo-2.21.jar:na]
at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:605) ~[com.esotericsoftware.kryo-kryo-2.21.jar:na]
... 29 common frames omitted
2013-11-11 05:39:57 . b.s.util [INFO] Halting process: ("Error on initialization")
2013-11-11 05:41:20 . b.s.d.worker [INFO] Launching worker for summingbird_QueryCountJob_pradhuman-75-1384144487 on smf1-atg-05-sr3.prod.twitter.com:31001 with id 58f8104b-5073-4ebb-aea6-062fb62fd0f5 and conf {"dev.zookeeper.path" "/tmp/dev-storm-zookeeper", "topology.tick.tuple.freq.secs" nil, "topology.builtin.metrics.bucket.size.secs" 60, "topology.fall.back.on.java.serialization" true, "topology.max.error.report.per.interval" 4, "zmq.linger.millis" 5000, "topology.skip.missing.kryo.registrations" false, "ui.childopts" "-Xmx768m", "storm.zookeeper.session.timeout" 40000, "nimbus.reassign" true, "topology.trident.batch.emit.interval.millis" 500, "drpc.servers" ["smf1-atx-22-sr4.prod.twitter.com"], "nimbus.monitor.freq.secs" 10, "java.library.path" "native:/usr/local/lib:/usr/lib64", "topology.executor.send.buffer.size" 1024, "storm.local.dir" "../storm-local", "supervisor.worker.start.timeout.secs" 120, "topology.enable.message.timeouts" true, "nimbus.cleanup.inbox.freq.secs" 600, "nimbus.inbox.jar.expiration.secs" 3600, "drpc.worker.threads" 64, "topology.worker.shared.thread.pool.size" 4, "nimbus.host" "smf1-atx-22-sr4.prod.twitter.com", "storm.zookeeper.port" 2181, "transactional.zookeeper.port" nil, "topology.executor.receive.buffer.size" 1024, "transactional.zookeeper.servers" nil, "storm.zookeeper.root" "/storm-dev", "supervisor.enable" true, "storm.zookeeper.servers" ["newstormzookeeper.local.twitter.com"], "transactional.zookeeper.root" "/transactional", "topology.acker.executors" 1, "mesos.framework.name" "storm-dev-smf1", "topology.transfer.buffer.size" 1024, "topology.worker.childopts" nil, "drpc.queue.size" 128, "worker.childopts" "-Xmx768m -Xloggc:./logs/gc-%ID%.log -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+PrintGCTimeStamps -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=15 -XX:GCLogFileSize=128M", "supervisor.heartbeat.frequency.secs" 15, "topology.error.throttle.interval.secs" 30, "mesos.master.failover.timeout.secs" 3600, "cluster.type" "dev", "zmq.hwm" 10000, "drpc.port" 3772, "supervisor.monitor.frequency.secs" 15, "topology.receiver.buffer.size" 8, "task.heartbeat.frequency.secs" 15, "topology.tasks" nil, "topology.spout.wait.strategy" "backtype.storm.spout.SleepSpoutWaitStrategy", "topology.metrics.consumer.register" [{"class" "com.twitter.storm.metric.WriteMetricsToCuckoo", "parallelism.hint" 1}], "topology.max.spout.pending" nil, "mesos.master.url" "zk://mesos:mesos@szookeeper.local.twitter.com:2181/home/mesos/multi/master", "storm.zookeeper.retry.interval" 1000, "topology.sleep.spout.wait.strategy.time.ms" 1, "mesos.executor.uri.default" "hftp://hadoop-rt-nn.smf1.twitter.com:50070/user/storm/mesos_releases/dev-smf1-1/mesos-storm-0.9.0-wip15-2.2.0.0.tgz", "nimbus.topology.validator" "storm.mesos.DisallowNonIsolatedTopologies", "supervisor.slots.ports" [6700 6701 6702 6703], "topology.debug" false, "nimbus.task.launch.secs" 120, "nimbus.supervisor.timeout.secs" 75, "topology.message.timeout.secs" 30, "task.refresh.poll.secs" 10, "topology.workers" 1, "supervisor.childopts" "-Xmx256m", "nimbus.thrift.port" 6627, "mesos.allowed.hosts" ["smf1-ayp-03-sr4.prod.twitter.com" "smf1-ayr-37-sr1.prod.twitter.com" "smf1-ays-34-sr1.prod.twitter.com" "smf1-ayt-08-sr4.prod.twitter.com" "smf1-ayu-03-sr1.prod.twitter.com" "smf1-ayv-37-sr4.prod.twitter.com" "smf1-ayw-26-sr4.prod.twitter.com" "smf1-asz-10-sr3.prod.twitter.com" "smf1-asz-28-sr2.prod.twitter.com" "smf1-atd-16-sr3.prod.twitter.com" "smf1-atb-10-sr3.prod.twitter.com" "smf1-asx-10-sr2.prod.twitter.com" "smf1-asx-01-sr2.prod.twitter.com" "smf1-atd-38-sr2.prod.twitter.com" "smf1-asx-10-sr3.prod.twitter.com" "smf1-atg-05-sr3.prod.twitter.com" "smf1-atd-28-sr2.prod.twitter.com" "smf1-atg-22-sr2.prod.twitter.com" "smf1-atg-22-sr3.prod.twitter.com" "smf1-asz-13-sr3.prod.twitter.com" "smf1-bjj-03-sr1.prod.twitter.com" "smf1-bjj-11-sr2.prod.twitter.com" "smf1-bjj-03-sr2.prod.twitter.com" "smf1-bhk-14-sr4.prod.twitter.com" "smf1-bma-19-sr4.prod.twitter.com" "smf1-bji-34-sr4.prod.twitter.com" "smf1-bjl-26-sr4.prod.twitter.com" "smf1-bjj-29-sr4.prod.twitter.com" "smf1-bmc-08-sr4.prod.twitter.com" "smf1-bjh-19-sr4.prod.twitter.com" "smf1-bjk-08-sr4.prod.twitter.com"], "storm.scheduler" "backtype.storm.scheduler.IsolationScheduler", "topology.stats.sample.rate" 0.05, "worker.heartbeat.frequency.secs" 3, "topology.acker.tasks" nil, "topology.disruptor.wait.strategy" "com.lmax.disruptor.BlockingWaitStrategy", "nimbus.task.timeout.secs" 75, "storm.zookeeper.connection.timeout" 40000, "topology.kryo.factory" "backtype.storm.serialization.DefaultKryoFactory", "drpc.invocations.port" 3773, "zmq.threads" 1, "storm.zookeeper.retry.times" 20, "topology.state.synchronization.timeout.secs" 60, "supervisor.worker.timeout.secs" 75, "nimbus.file.copy.expiration.secs" 600, "drpc.request.timeout.secs" 600, "storm.local.mode.zmq" false, "ui.port" 8081, "nimbus.childopts" "-Xmx1024m", "storm.cluster.mode" "distributed", "topology.optimize" true, "topology.max.task.parallelism" nil}
2013-11-11 05:41:20 . c.t.l.ScribeHandler [WARN] Scribe server is archaic; changing to old protocol for future requests.
2013-11-11 05:41:20 . c.n.c.f.i.CuratorFrameworkImpl [INFO] Starting
2013-11-11 05:41:20 . c.t.l.ScribeHandler [INFO] sent records: 0, per second: 0, dropped records: 0, reconnection failures: 0, reconnection skipped: 0
2013-11-11 05:41:20 . b.s.zookeeper [INFO] Zookeeper state update: :connected:none
2013-11-11 05:41:20 . c.n.c.f.i.CuratorFrameworkImpl [INFO] Starting
2013-11-11 05:41:21 . b.s.d.worker [ERROR] Error on initialization of server mk-worker
com.esotericsoftware.kryo.KryoException: java.lang.IndexOutOfBoundsException: Index: 1, Size: 1
Serialization trace:
val$r (com.twitter.chill.KryoInstantiator$4)
at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:626) ~[com.esotericsoftware.kryo-kryo-2.21.jar:na]
at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221) ~[com.esotericsoftware.kryo-kryo-2.21.jar:na]
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729) ~[com.esotericsoftware.kryo-kryo-2.21.jar:na]
at com.twitter.chill.config.ConfiguredInstantiator.deserialize(ConfiguredInstantiator.java:141) ~[stormjar.jar:na]
at com.twitter.chill.config.ConfiguredInstantiator.<init>(ConfiguredInstantiator.java:66) ~[stormjar.jar:na]
at com.twitter.chill.storm.BlizzardKryoFactory.getKryo(BlizzardKryoFactory.java:40) ~[stormjar.jar:na]
at backtype.storm.serialization.SerializationFactory.getKryo(SerializationFactory.java:33) ~[storm-storm-0.9.0-wip15-2.2.0.jar:na]
at backtype.storm.serialization.KryoValuesDeserializer.<init>(KryoValuesDeserializer.java:15) ~[storm-storm-0.9.0-wip15-2.2.0.jar:na]
at backtype.storm.serialization.KryoTupleDeserializer.<init>(KryoTupleDeserializer.java:22) ~[storm-storm-0.9.0-wip15-2.2.0.jar:na]
at backtype.storm.daemon.executor$mk_executor_data$fn__3420.invoke(executor.clj:200) ~[storm-storm-0.9.0-wip15-2.2.0.jar:na]
at backtype.storm.util$assoc_apply_self.invoke(util.clj:734) ~[storm-storm-0.9.0-wip15-2.2.0.jar:na]
at backtype.storm.daemon.executor$mk_executor_data.invoke(executor.clj:200) ~[storm-storm-0.9.0-wip15-2.2.0.jar:na]
at backtype.storm.daemon.executor$mk_executor.invoke(executor.clj:304) ~[storm-storm-0.9.0-wip15-2.2.0.jar:na]
at backtype.storm.daemon.worker$fn__4830$exec_fn__1194__auto____4831$iter__4836__4840$fn__4841.invoke(worker.clj:349) ~[storm-storm-0.9.0-wip15-2.2.0.jar:na]
at clojure.lang.LazySeq.sval(LazySeq.java:42) ~[org.clojure-clojure-1.4.0.jar:na]
at clojure.lang.LazySeq.seq(LazySeq.java:60) ~[org.clojure-clojure-1.4.0.jar:na]
at clojure.lang.RT.seq(RT.java:473) ~[org.clojure-clojure-1.4.0.jar:na]
at clojure.core$seq.invoke(core.clj:133) ~[org.clojure-clojure-1.4.0.jar:na]
at clojure.core$dorun.invoke(core.clj:2725) ~[org.clojure-clojure-1.4.0.jar:na]
at clojure.core$doall.invoke(core.clj:2741) ~[org.clojure-clojure-1.4.0.jar:na]
at backtype.storm.daemon.worker$fn__4830$exec_fn__1194__auto____4831.invoke(worker.clj:349) ~[storm-storm-0.9.0-wip15-2.2.0.jar:na]
at clojure.lang.AFn.applyToHelper(AFn.java:185) [org.clojure-clojure-1.4.0.jar:na]
at clojure.lang.AFn.applyTo(AFn.java:151) [org.clojure-clojure-1.4.0.jar:na]
at clojure.core$apply.invoke(core.clj:601) ~[org.clojure-clojure-1.4.0.jar:na]
at backtype.storm.daemon.worker$fn__4830$mk_worker__4886.doInvoke(worker.clj:318) [storm-storm-0.9.0-wip15-2.2.0.jar:na]
at clojure.lang.RestFn.invoke(RestFn.java:512) [org.clojure-clojure-1.4.0.jar:na]
at backtype.storm.daemon.worker$_main.invoke(worker.clj:428) [storm-storm-0.9.0-wip15-2.2.0.jar:na]
at clojure.lang.AFn.applyToHelper(AFn.java:172) [org.clojure-clojure-1.4.0.jar:na]
at clojure.lang.AFn.applyTo(AFn.java:151) [org.clojure-clojure-1.4.0.jar:na]
at backtype.storm.daemon.worker.main(Unknown Source) [storm-storm-0.9.0-wip15-2.2.0.jar:na]
Caused by: java.lang.IndexOutOfBoundsException: Index: 1, Size: 1
at java.util.ArrayList.rangeCheck(ArrayList.java:635) ~[na:1.7.0-u40-rel_20]
at java.util.ArrayList.get(ArrayList.java:411) ~[na:1.7.0-u40-rel_20]
at com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapReferenceResolver.java:42) ~[com.esotericsoftware.kryo-kryo-2.21.jar:na]
at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:773) ~[com.esotericsoftware.kryo-kryo-2.21.jar:na]
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:646) ~[com.esotericsoftware.kryo-kryo-2.21.jar:na]
at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:605) ~[com.esotericsoftware.kryo-kryo-2.21.jar:na]
... 29 common frames omitted
2013-11-11 05:41:21 . b.s.util [INFO] Halting process: ("Error on initialization")
2013-11-11 05:42:43 . b.s.d.worker [INFO] Launching worker for summingbird_QueryCountJob_pradhuman-75-1384144487 on smf1-atg-05-sr3.prod.twitter.com:31001 with id 1a3b7700-4368-4044-a82b-d77c785156e4 and conf {"dev.zookeeper.path" "/tmp/dev-storm-zookeeper", "topology.tick.tuple.freq.secs" nil, "topology.builtin.metrics.bucket.size.secs" 60, "topology.fall.back.on.java.serialization" true, "topology.max.error.report.per.interval" 4, "zmq.linger.millis" 5000, "topology.skip.missing.kryo.registrations" false, "ui.childopts" "-Xmx768m", "storm.zookeeper.session.timeout" 40000, "nimbus.reassign" true, "topology.trident.batch.emit.interval.millis" 500, "drpc.servers" ["smf1-atx-22-sr4.prod.twitter.com"], "nimbus.monitor.freq.secs" 10, "java.library.path" "native:/usr/local/lib:/usr/lib64", "topology.executor.send.buffer.size" 1024, "storm.local.dir" "../storm-local", "supervisor.worker.start.timeout.secs" 120, "topology.enable.message.timeouts" true, "nimbus.cleanup.inbox.freq.secs" 600, "nimbus.inbox.jar.expiration.secs" 3600, "drpc.worker.threads" 64, "topology.worker.shared.thread.pool.size" 4, "nimbus.host" "smf1-atx-22-sr4.prod.twitter.com", "storm.zookeeper.port" 2181, "transactional.zookeeper.port" nil, "topology.executor.receive.buffer.size" 1024, "transactional.zookeeper.servers" nil, "storm.zookeeper.root" "/storm-dev", "supervisor.enable" true, "storm.zookeeper.servers" ["newstormzookeeper.local.twitter.com"], "transactional.zookeeper.root" "/transactional", "topology.acker.executors" 1, "mesos.framework.name" "storm-dev-smf1", "topology.transfer.buffer.size" 1024, "topology.worker.childopts" nil, "drpc.queue.size" 128, "worker.childopts" "-Xmx768m -Xloggc:./logs/gc-%ID%.log -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+PrintGCTimeStamps -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=15 -XX:GCLogFileSize=128M", "supervisor.heartbeat.frequency.secs" 15, "topology.error.throttle.interval.secs" 30, "mesos.master.failover.timeout.secs" 3600, "cluster.type" "dev", "zmq.hwm" 10000, "drpc.port" 3772, "supervisor.monitor.frequency.secs" 15, "topology.receiver.buffer.size" 8, "task.heartbeat.frequency.secs" 15, "topology.tasks" nil, "topology.spout.wait.strategy" "backtype.storm.spout.SleepSpoutWaitStrategy", "topology.metrics.consumer.register" [{"class" "com.twitter.storm.metric.WriteMetricsToCuckoo", "parallelism.hint" 1}], "topology.max.spout.pending" nil, "mesos.master.url" "zk://mesos:mesos@szookeeper.local.twitter.com:2181/home/mesos/multi/master", "storm.zookeeper.retry.interval" 1000, "topology.sleep.spout.wait.strategy.time.ms" 1, "mesos.executor.uri.default" "hftp://hadoop-rt-nn.smf1.twitter.com:50070/user/storm/mesos_releases/dev-smf1-1/mesos-storm-0.9.0-wip15-2.2.0.0.tgz", "nimbus.topology.validator" "storm.mesos.DisallowNonIsolatedTopologies", "supervisor.slots.ports" [6700 6701 6702 6703], "topology.debug" false, "nimbus.task.launch.secs" 120, "nimbus.supervisor.timeout.secs" 75, "topology.message.timeout.secs" 30, "task.refresh.poll.secs" 10, "topology.workers" 1, "supervisor.childopts" "-Xmx256m", "nimbus.thrift.port" 6627, "mesos.allowed.hosts" ["smf1-ayp-03-sr4.prod.twitter.com" "smf1-ayr-37-sr1.prod.twitter.com" "smf1-ays-34-sr1.prod.twitter.com" "smf1-ayt-08-sr4.prod.twitter.com" "smf1-ayu-03-sr1.prod.twitter.com" "smf1-ayv-37-sr4.prod.twitter.com" "smf1-ayw-26-sr4.prod.twitter.com" "smf1-asz-10-sr3.prod.twitter.com" "smf1-asz-28-sr2.prod.twitter.com" "smf1-atd-16-sr3.prod.twitter.com" "smf1-atb-10-sr3.prod.twitter.com" "smf1-asx-10-sr2.prod.twitter.com" "smf1-asx-01-sr2.prod.twitter.com" "smf1-atd-38-sr2.prod.twitter.com" "smf1-asx-10-sr3.prod.twitter.com" "smf1-atg-05-sr3.prod.twitter.com" "smf1-atd-28-sr2.prod.twitter.com" "smf1-atg-22-sr2.prod.twitter.com" "smf1-atg-22-sr3.prod.twitter.com" "smf1-asz-13-sr3.prod.twitter.com" "smf1-bjj-03-sr1.prod.twitter.com" "smf1-bjj-11-sr2.prod.twitter.com" "smf1-bjj-03-sr2.prod.twitter.com" "smf1-bhk-14-sr4.prod.twitter.com" "smf1-bma-19-sr4.prod.twitter.com" "smf1-bji-34-sr4.prod.twitter.com" "smf1-bjl-26-sr4.prod.twitter.com" "smf1-bjj-29-sr4.prod.twitter.com" "smf1-bmc-08-sr4.prod.twitter.com" "smf1-bjh-19-sr4.prod.twitter.com" "smf1-bjk-08-sr4.prod.twitter.com"], "storm.scheduler" "backtype.storm.scheduler.IsolationScheduler", "topology.stats.sample.rate" 0.05, "worker.heartbeat.frequency.secs" 3, "topology.acker.tasks" nil, "topology.disruptor.wait.strategy" "com.lmax.disruptor.BlockingWaitStrategy", "nimbus.task.timeout.secs" 75, "storm.zookeeper.connection.timeout" 40000, "topology.kryo.factory" "backtype.storm.serialization.DefaultKryoFactory", "drpc.invocations.port" 3773, "zmq.threads" 1, "storm.zookeeper.retry.times" 20, "topology.state.synchronization.timeout.secs" 60, "supervisor.worker.timeout.secs" 75, "nimbus.file.copy.expiration.secs" 600, "drpc.request.timeout.secs" 600, "storm.local.mode.zmq" false, "ui.port" 8081, "nimbus.childopts" "-Xmx1024m", "storm.cluster.mode" "distributed", "topology.optimize" true, "topology.max.task.parallelism" nil}
2013-11-11 05:42:44 . c.t.l.ScribeHandler [WARN] Scribe server is archaic; changing to old protocol for future requests.
2013-11-11 05:42:44 . c.n.c.f.i.CuratorFrameworkImpl [INFO] Starting
2013-11-11 05:42:44 . c.t.l.ScribeHandler [INFO] sent records: 0, per second: 0, dropped records: 0, reconnection failures: 0, reconnection skipped: 0
2013-11-11 05:42:44 . b.s.zookeeper [INFO] Zookeeper state update: :connected:none
2013-11-11 05:42:44 . c.n.c.f.i.CuratorFrameworkImpl [INFO] Starting
2013-11-11 05:42:46 . b.s.d.worker [ERROR] Error on initialization of server mk-worker
com.esotericsoftware.kryo.KryoException: java.lang.IndexOutOfBoundsException: Index: 1, Size: 1
Serialization trace:
val$r (com.twitter.chill.KryoInstantiator$4)
at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:626) ~[com.esotericsoftware.kryo-kryo-2.21.jar:na]
at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221) ~[com.esotericsoftware.kryo-kryo-2.21.jar:na]
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729) ~[com.esotericsoftware.kryo-kryo-2.21.jar:na]
at com.twitter.chill.config.ConfiguredInstantiator.deserialize(ConfiguredInstantiator.java:141) ~[stormjar.jar:na]
at com.twitter.chill.config.ConfiguredInstantiator.<init>(ConfiguredInstantiator.java:66) ~[stormjar.jar:na]
at com.twitter.chill.storm.BlizzardKryoFactory.getKryo(BlizzardKryoFactory.java:40) ~[stormjar.jar:na]
at backtype.storm.serialization.SerializationFactory.getKryo(SerializationFactory.java:33) ~[storm-storm-0.9.0-wip15-2.2.0.jar:na]
at backtype.storm.serialization.KryoValuesDeserializer.<init>(KryoValuesDeserializer.java:15) ~[storm-storm-0.9.0-wip15-2.2.0.jar:na]
at backtype.storm.serialization.KryoTupleDeserializer.<init>(KryoTupleDeserializer.java:22) ~[storm-storm-0.9.0-wip15-2.2.0.jar:na]
at backtype.storm.daemon.executor$mk_executor_data$fn__3420.invoke(executor.clj:200) ~[storm-storm-0.9.0-wip15-2.2.0.jar:na]
at backtype.storm.util$assoc_apply_self.invoke(util.clj:734) ~[storm-storm-0.9.0-wip15-2.2.0.jar:na]
at backtype.storm.daemon.executor$mk_executor_data.invoke(executor.clj:200) ~[storm-storm-0.9.0-wip15-2.2.0.jar:na]
at backtype.storm.daemon.executor$mk_executor.invoke(executor.clj:304) ~[storm-storm-0.9.0-wip15-2.2.0.jar:na]
at backtype.storm.daemon.worker$fn__4830$exec_fn__1194__auto____4831$iter__4836__4840$fn__4841.invoke(worker.clj:349) ~[storm-storm-0.9.0-wip15-2.2.0.jar:na]
at clojure.lang.LazySeq.sval(LazySeq.java:42) ~[org.clojure-clojure-1.4.0.jar:na]
at clojure.lang.LazySeq.seq(LazySeq.java:60) ~[org.clojure-clojure-1.4.0.jar:na]
at clojure.lang.RT.seq(RT.java:473) ~[org.clojure-clojure-1.4.0.jar:na]
at clojure.core$seq.invoke(core.clj:133) ~[org.clojure-clojure-1.4.0.jar:na]
at clojure.core$dorun.invoke(core.clj:2725) ~[org.clojure-clojure-1.4.0.jar:na]
at clojure.core$doall.invoke(core.clj:2741) ~[org.clojure-clojure-1.4.0.jar:na]
at backtype.storm.daemon.worker$fn__4830$exec_fn__1194__auto____4831.invoke(worker.clj:349) ~[storm-storm-0.9.0-wip15-2.2.0.jar:na]
at clojure.lang.AFn.applyToHelper(AFn.java:185) [org.clojure-clojure-1.4.0.jar:na]
at clojure.lang.AFn.applyTo(AFn.java:151) [org.clojure-clojure-1.4.0.jar:na]
at clojure.core$apply.invoke(core.clj:601) ~[org.clojure-clojure-1.4.0.jar:na]
at backtype.storm.daemon.worker$fn__4830$mk_worker__4886.doInvoke(worker.clj:318) [storm-storm-0.9.0-wip15-2.2.0.jar:na]
at clojure.lang.RestFn.invoke(RestFn.java:512) [org.clojure-clojure-1.4.0.jar:na]
at backtype.storm.daemon.worker$_main.invoke(worker.clj:428) [storm-storm-0.9.0-wip15-2.2.0.jar:na]
at clojure.lang.AFn.applyToHelper(AFn.java:172) [org.clojure-clojure-1.4.0.jar:na]
at clojure.lang.AFn.applyTo(AFn.java:151) [org.clojure-clojure-1.4.0.jar:na]
at backtype.storm.daemon.worker.main(Unknown Source) [storm-storm-0.9.0-wip15-2.2.0.jar:na]
Caused by: java.lang.IndexOutOfBoundsException: Index: 1, Size: 1
at java.util.ArrayList.rangeCheck(ArrayList.java:635) ~[na:1.7.0-u40-rel_20]
at java.util.ArrayList.get(ArrayList.java:411) ~[na:1.7.0-u40-rel_20]
at com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapReferenceResolver.java:42) ~[com.esotericsoftware.kryo-kryo-2.21.jar:na]
at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:773) ~[com.esotericsoftware.kryo-kryo-2.21.jar:na]
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:646) ~[com.esotericsoftware.kryo-kryo-2.21.jar:na]
at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:605) ~[com.esotericsoftware.kryo-kryo-2.21.jar:na]
... 29 common frames omitted
2013-11-11 05:42:46 . b.s.util [INFO] Halting process: ("Error on initialization")
2013-11-11 05:44:09 . b.s.d.worker [INFO] Launching worker for summingbird_QueryCountJob_pradhuman-75-1384144487 on smf1-atg-05-sr3.prod.twitter.com:31001 with id 270eeb85-dc73-42f0-ac5f-b3438bfab530 and conf {"dev.zookeeper.path" "/tmp/dev-storm-zookeeper", "topology.tick.tuple.freq.secs" nil, "topology.builtin.metrics.bucket.size.secs" 60, "topology.fall.back.on.java.serialization" true, "topology.max.error.report.per.interval" 4, "zmq.linger.millis" 5000, "topology.skip.missing.kryo.registrations" false, "ui.childopts" "-Xmx768m", "storm.zookeeper.session.timeout" 40000, "nimbus.reassign" true, "topology.trident.batch.emit.interval.millis" 500, "drpc.servers" ["smf1-atx-22-sr4.prod.twitter.com"], "nimbus.monitor.freq.secs" 10, "java.library.path" "native:/usr/local/lib:/usr/lib64", "topology.executor.send.buffer.size" 1024, "storm.local.dir" "../storm-local", "supervisor.worker.start.timeout.secs" 120, "topology.enable.message.timeouts" true, "nimbus.cleanup.inbox.freq.secs" 600, "nimbus.inbox.jar.expiration.secs" 3600, "drpc.worker.threads" 64, "topology.worker.shared.thread.pool.size" 4, "nimbus.host" "smf1-atx-22-sr4.prod.twitter.com", "storm.zookeeper.port" 2181, "transactional.zookeeper.port" nil, "topology.executor.receive.buffer.size" 1024, "transactional.zookeeper.servers" nil, "storm.zookeeper.root" "/storm-dev", "supervisor.enable" true, "storm.zookeeper.servers" ["newstormzookeeper.local.twitter.com"], "transactional.zookeeper.root" "/transactional", "topology.acker.executors" 1, "mesos.framework.name" "storm-dev-smf1", "topology.transfer.buffer.size" 1024, "topology.worker.childopts" nil, "drpc.queue.size" 128, "worker.childopts" "-Xmx768m -Xloggc:./logs/gc-%ID%.log -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+PrintGCTimeStamps -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=15 -XX:GCLogFileSize=128M", "supervisor.heartbeat.frequency.secs" 15, "topology.error.throttle.interval.secs" 30, "mesos.master.failover.timeout.secs" 3600, "cluster.type" "dev", "zmq.hwm" 10000, "drpc.port" 3772, "supervisor.monitor.frequency.secs" 15, "topology.receiver.buffer.size" 8, "task.heartbeat.frequency.secs" 15, "topology.tasks" nil, "topology.spout.wait.strategy" "backtype.storm.spout.SleepSpoutWaitStrategy", "topology.metrics.consumer.register" [{"class" "com.twitter.storm.metric.WriteMetricsToCuckoo", "parallelism.hint" 1}], "topology.max.spout.pending" nil, "mesos.master.url" "zk://mesos:mesos@szookeeper.local.twitter.com:2181/home/mesos/multi/master", "storm.zookeeper.retry.interval" 1000, "topology.sleep.spout.wait.strategy.time.ms" 1, "mesos.executor.uri.default" "hftp://hadoop-rt-nn.smf1.twitter.com:50070/user/storm/mesos_releases/dev-smf1-1/mesos-storm-0.9.0-wip15-2.2.0.0.tgz", "nimbus.topology.validator" "storm.mesos.DisallowNonIsolatedTopologies", "supervisor.slots.ports" [6700 6701 6702 6703], "topology.debug" false, "nimbus.task.launch.secs" 120, "nimbus.supervisor.timeout.secs" 75, "topology.message.timeout.secs" 30, "task.refresh.poll.secs" 10, "topology.workers" 1, "supervisor.childopts" "-Xmx256m", "nimbus.thrift.port" 6627, "mesos.allowed.hosts" ["smf1-ayp-03-sr4.prod.twitter.com" "smf1-ayr-37-sr1.prod.twitter.com" "smf1-ays-34-sr1.prod.twitter.com" "smf1-ayt-08-sr4.prod.twitter.com" "smf1-ayu-03-sr1.prod.twitter.com" "smf1-ayv-37-sr4.prod.twitter.com" "smf1-ayw-26-sr4.prod.twitter.com" "smf1-asz-10-sr3.prod.twitter.com" "smf1-asz-28-sr2.prod.twitter.com" "smf1-atd-16-sr3.prod.twitter.com" "smf1-atb-10-sr3.prod.twitter.com" "smf1-asx-10-sr2.prod.twitter.com" "smf1-asx-01-sr2.prod.twitter.com" "smf1-atd-38-sr2.prod.twitter.com" "smf1-asx-10-sr3.prod.twitter.com" "smf1-atg-05-sr3.prod.twitter.com" "smf1-atd-28-sr2.prod.twitter.com" "smf1-atg-22-sr2.prod.twitter.com" "smf1-atg-22-sr3.prod.twitter.com" "smf1-asz-13-sr3.prod.twitter.com" "smf1-bjj-03-sr1.prod.twitter.com" "smf1-bjj-11-sr2.prod.twitter.com" "smf1-bjj-03-sr2.prod.twitter.com" "smf1-bhk-14-sr4.prod.twitter.com" "smf1-bma-19-sr4.prod.twitter.com" "smf1-bji-34-sr4.prod.twitter.com" "smf1-bjl-26-sr4.prod.twitter.com" "smf1-bjj-29-sr4.prod.twitter.com" "smf1-bmc-08-sr4.prod.twitter.com" "smf1-bjh-19-sr4.prod.twitter.com" "smf1-bjk-08-sr4.prod.twitter.com"], "storm.scheduler" "backtype.storm.scheduler.IsolationScheduler", "topology.stats.sample.rate" 0.05, "worker.heartbeat.frequency.secs" 3, "topology.acker.tasks" nil, "topology.disruptor.wait.strategy" "com.lmax.disruptor.BlockingWaitStrategy", "nimbus.task.timeout.secs" 75, "storm.zookeeper.connection.timeout" 40000, "topology.kryo.factory" "backtype.storm.serialization.DefaultKryoFactory", "drpc.invocations.port" 3773, "zmq.threads" 1, "storm.zookeeper.retry.times" 20, "topology.state.synchronization.timeout.secs" 60, "supervisor.worker.timeout.secs" 75, "nimbus.file.copy.expiration.secs" 600, "drpc.request.timeout.secs" 600, "storm.local.mode.zmq" false, "ui.port" 8081, "nimbus.childopts" "-Xmx1024m", "storm.cluster.mode" "distributed", "topology.optimize" true, "topology.max.task.parallelism" nil}
2013-11-11 05:44:10 . c.t.l.ScribeHandler [WARN] Scribe server is archaic; changing to old protocol for future requests.
2013-11-11 05:44:10 . c.n.c.f.i.CuratorFrameworkImpl [INFO] Starting
2013-11-11 05:44:10 . c.t.l.ScribeHandler [INFO] sent records: 0, per second: 0, dropped records: 0, reconnection failures: 0, reconnection skipped: 0
2013-11-11 05:44:10 . b.s.zookeeper [INFO] Zookeeper state update: :connected:none
2013-11-11 05:44:10 . c.n.c.f.i.CuratorFrameworkImpl [INFO] Starting
2013-11-11 05:44:11 . b.s.d.worker [ERROR] Error on initialization of server mk-worker
com.esotericsoftware.kryo.KryoException: java.lang.IndexOutOfBoundsException: Index: 1, Size: 1
Serialization trace:
val$r (com.twitter.chill.KryoInstantiator$4)
at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:626) ~[com.esotericsoftware.kryo-kryo-2.21.jar:na]
at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221) ~[com.esotericsoftware.kryo-kryo-2.21.jar:na]
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729) ~[com.esotericsoftware.kryo-kryo-2.21.jar:na]
at com.twitter.chill.config.ConfiguredInstantiator.deserialize(ConfiguredInstantiator.java:141) ~[stormjar.jar:na]
at com.twitter.chill.config.ConfiguredInstantiator.<init>(ConfiguredInstantiator.java:66) ~[stormjar.jar:na]
at com.twitter.chill.storm.BlizzardKryoFactory.getKryo(BlizzardKryoFactory.java:40) ~[stormjar.jar:na]
at backtype.storm.serialization.SerializationFactory.getKryo(SerializationFactory.java:33) ~[storm-storm-0.9.0-wip15-2.2.0.jar:na]
at backtype.storm.serialization.KryoValuesDeserializer.<init>(KryoValuesDeserializer.java:15) ~[storm-storm-0.9.0-wip15-2.2.0.jar:na]
at backtype.storm.serialization.KryoTupleDeserializer.<init>(KryoTupleDeserializer.java:22) ~[storm-storm-0.9.0-wip15-2.2.0.jar:na]
at backtype.storm.daemon.executor$mk_executor_data$fn__3420.invoke(executor.clj:200) ~[storm-storm-0.9.0-wip15-2.2.0.jar:na]
at backtype.storm.util$assoc_apply_self.invoke(util.clj:734) ~[storm-storm-0.9.0-wip15-2.2.0.jar:na]
at backtype.storm.daemon.executor$mk_executor_data.invoke(executor.clj:200) ~[storm-storm-0.9.0-wip15-2.2.0.jar:na]
at backtype.storm.daemon.executor$mk_executor.invoke(executor.clj:304) ~[storm-storm-0.9.0-wip15-2.2.0.jar:na]
at backtype.storm.daemon.worker$fn__4830$exec_fn__1194__auto____4831$iter__4836__4840$fn__4841.invoke(worker.clj:349) ~[storm-storm-0.9.0-wip15-2.2.0.jar:na]
at clojure.lang.LazySeq.sval(LazySeq.java:42) ~[org.clojure-clojure-1.4.0.jar:na]
at clojure.lang.LazySeq.seq(LazySeq.java:60) ~[org.clojure-clojure-1.4.0.jar:na]
at clojure.lang.RT.seq(RT.java:473) ~[org.clojure-clojure-1.4.0.jar:na]
at clojure.core$seq.invoke(core.clj:133) ~[org.clojure-clojure-1.4.0.jar:na]
at clojure.core$dorun.invoke(core.clj:2725) ~[org.clojure-clojure-1.4.0.jar:na]
at clojure.core$doall.invoke(core.clj:2741) ~[org.clojure-clojure-1.4.0.jar:na]
at backtype.storm.daemon.worker$fn__4830$exec_fn__1194__auto____4831.invoke(worker.clj:349) ~[storm-storm-0.9.0-wip15-2.2.0.jar:na]
at clojure.lang.AFn.applyToHelper(AFn.java:185) [org.clojure-clojure-1.4.0.jar:na]
at clojure.lang.AFn.applyTo(AFn.java:151) [org.clojure-clojure-1.4.0.jar:na]
at clojure.core$apply.invoke(core.clj:601) ~[org.clojure-clojure-1.4.0.jar:na]
at backtype.storm.daemon.worker$fn__4830$mk_worker__4886.doInvoke(worker.clj:318) [storm-storm-0.9.0-wip15-2.2.0.jar:na]
at clojure.lang.RestFn.invoke(RestFn.java:512) [org.clojure-clojure-1.4.0.jar:na]
at backtype.storm.daemon.worker$_main.invoke(worker.clj:428) [storm-storm-0.9.0-wip15-2.2.0.jar:na]
at clojure.lang.AFn.applyToHelper(AFn.java:172) [org.clojure-clojure-1.4.0.jar:na]
at clojure.lang.AFn.applyTo(AFn.java:151) [org.clojure-clojure-1.4.0.jar:na]
at backtype.storm.daemon.worker.main(Unknown Source) [storm-storm-0.9.0-wip15-2.2.0.jar:na]
Caused by: java.lang.IndexOutOfBoundsException: Index: 1, Size: 1
at java.util.ArrayList.rangeCheck(ArrayList.java:635) ~[na:1.7.0-u40-rel_20]
at java.util.ArrayList.get(ArrayList.java:411) ~[na:1.7.0-u40-rel_20]
at com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapReferenceResolver.java:42) ~[com.esotericsoftware.kryo-kryo-2.21.jar:na]
at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:773) ~[com.esotericsoftware.kryo-kryo-2.21.jar:na]
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:646) ~[com.esotericsoftware.kryo-kryo-2.21.jar:na]
at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:605) ~[com.esotericsoftware.kryo-kryo-2.21.jar:na]
... 29 common frames omitted
2013-11-11 05:44:11 . b.s.util [INFO] Halting process: ("Error on initialization")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment