Real-Time Experiment Rollups Implementation
Created
November 10, 2013 00:19
-
-
Save amedina/7391946 to your computer and use it in GitHub Desktop.
Summingbird Skeleton Exp Rollup Job
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
scala_library(name = 'exprollup_rt', | |
dependencies = [ | |
pants('src/scala/com/twitter/ads/batch/experimental/amedina/summingbird/exp_rollup_rt/common'), | |
pants('src/scala/com/twitter/ads/batch/experimental/amedina/summingbird/exp_rollup_rt/job'), | |
pants('src/scala/com/twitter/ads/batch/experimental/amedina/summingbird/exp_rollup_rt/client'), | |
], | |
) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
scala_library(name = 'client', | |
dependencies = [ | |
pants('3rdparty/storehaus:storehaus-core'), | |
pants('src/thrift/com/twitter/ads/realtime/exprolluprt'), | |
pants('3rdparty/summingbird:summingbird-client'), | |
pants('src/scala/com/twitter/storehaus_internal/manhattan'), | |
pants('src/scala/com/twitter/ads/batch/experimental/amedina/summingbird/exp_rollup_rt/common'), | |
], | |
sources = globs('*.scala'), | |
provides = artifact( | |
org = 'com.twitter', | |
name = 'exprollup-rt-client', | |
repo = pants('build-support/twitter/ivy:internal'), | |
), | |
) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.twitter.ads.batch.realtime.exprollup_rt.client | |
import com.twitter.scalding.Args | |
import com.twitter.summingbird.store.ClientStore | |
import com.twitter.storehaus.ReadableStore | |
import com.twitter.storehaus_internal.manhattan.ManhattanStore | |
import com.twitter.ads.batch.realtime.exprollup_rt.common.ExpRollupRTConfig | |
class ExpRollupRTClient(val args: Args) { | |
val config = new ExpRollupRTConfig(args) | |
import config._ | |
val manhattanStore = ManhattanStore.fromDataset(dataset) | |
val client: ReadableStore[Long, Long] = | |
ClientStore(manhattanStore) | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# This target pulls in the summingbird "Batch" library, plus some | |
# code for serialization that we can use here at Twitter to make | |
# life easier. We also pull in a Manhattan reader. | |
scala_library(name = 'common', | |
dependencies = [ | |
pants('3rdparty/algebird:algebird-core'), | |
pants('src/thrift/com/twitter/ads/realtime/exprolluprt'), | |
pants('3rdparty/summingbird:summingbird-core'), | |
pants('3rdparty/summingbird:summingbird-batch'), | |
pants('src/scala/com/twitter/summingbird_internal/bijection'), | |
pants('src/scala/com/twitter/storehaus_internal/manhattan'), | |
pants('src/scala/com/twitter/storehaus_internal/memcache') | |
], | |
sources = globs('*.scala'), | |
) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.twitter.ads.batch.realtime.exprollup_rt.common | |
import com.twitter.ads.realtime.exprolluprt._ | |
import com.twitter.bijection.{ Bijection, Injection } | |
import com.twitter.scalding.Args | |
import com.twitter.summingbird.batch.{ Batcher, BatchID } | |
import com.twitter.storehaus_internal.manhattan.Dataset | |
import com.twitter.storehaus_internal.memcache.MemcacheStore | |
import com.twitter.bijection.thrift.CompactThriftCodec | |
import com.twitter.summingbird_internal.bijection.BatchPairImplicits._ | |
import com.twitter.ads.adserver.CohortDimensions | |
class ExpRollupRTConfig(val args: Args) extends Serializable { | |
val hdfsPath = args.required("hdfs_path") | |
val reqHdfsPath = hdfsPath + "/requests" | |
val impHdfsPath = hdfsPath + "/impressions" | |
val engHdfsPath = hdfsPath + "/engagements" | |
val manhattanAppID = args.required("manhattan_id") | |
val requestDatasetName = args.required("req_dataset_name") | |
val impressionDatasetName = args.required("imp_dataset_name") | |
val engagementDatasetName = args.required("eng_dataset_name") | |
def reqKeyCodec: Injection[ExpRollupRTReqKey, Array[Byte]] = | |
CompactThriftCodec[ExpRollupRTReqKey] | |
implicit lazy val reqKeyValueCodec = reqKeyCodec | |
val reqDataset = | |
Dataset[ExpRollupRTReqKey, (BatchID, Long)](reqHdfsPath, manhattanAppID, requestDatasetName) | |
def impKeyCodec: Injection[ExpRollupRTImpCallBackKey, Array[Byte]] = | |
CompactThriftCodec[ExpRollupRTImpCallBackKey] | |
implicit lazy val impKeyValueCodec = impKeyCodec | |
val impDataset = | |
Dataset[ExpRollupRTImpCallBackKey, (BatchID, Long)](impHdfsPath, manhattanAppID, impressionDatasetName) | |
def engKeyCodec: Injection[ExpRollupRTEngKey, Array[Byte]] = | |
CompactThriftCodec[ExpRollupRTEngKey] | |
implicit lazy val engKeyValueCodec = engKeyCodec | |
val engDataset = | |
Dataset[ExpRollupRTEngKey, (BatchID, Long)](engHdfsPath, manhattanAppID, engagementDatasetName) | |
implicit val reqInjection: Injection[(ExpRollupRTReqKey, (BatchID, Long)), (Array[Byte], Array[Byte])] = | |
reqDataset.pairInjection | |
implicit val impInjection: Injection[(ExpRollupRTImpCallBackKey, (BatchID, Long)), (Array[Byte], Array[Byte])] = | |
impDataset.pairInjection | |
implicit val engInjection: Injection[(ExpRollupRTEngKey, (BatchID, Long)), (Array[Byte], Array[Byte])] = | |
engDataset.pairInjection | |
implicit val batcher = Batcher.ofHours(1) | |
//The onlineStore uses a shared set of memcache hosts under twemcache_summingbird | |
def reqOnlineStore = MemcacheStore.mergeable[(ExpRollupRTReqKey, BatchID),Long](requestDatasetName) | |
def impOnlineStore = MemcacheStore.mergeable[(ExpRollupRTImpCallBackKey, BatchID),Long](impressionDatasetName) | |
def engOnlineStore = MemcacheStore.mergeable[(ExpRollupRTEngKey, BatchID),Long](engagementDatasetName) | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
ON_HADOOP_CLUSTER_EXCLUDES = [ | |
exclude('commons-httpclient','commons-httpclient'), | |
exclude('commons-net','commons-net'), | |
# There are pulled in by elephant-bird or cascading: | |
exclude('org.apache', 'hadoop-core'), | |
exclude('org.apache', 'hadoop-lzo'), | |
exclude('com.hadoop', 'hadoop-lzo'), | |
exclude('org.apache.hadoop'), | |
exclude('org.apache.hbase'), | |
exclude('org.apache.zookeeper','zookeeper'), | |
exclude('org.mortbay.jetty'), | |
] | |
LOGGING_EXCLUDES = [ | |
exclude('log4j'), | |
exclude('org.slf4j'), | |
exclude('commons-logging', 'commons-logging'), | |
exclude('ch.qos.logback'), | |
] | |
ON_STORM_CLUSTER_EXCLUDES = [ | |
#There are pulled in by elephant-bird or cascading: | |
exclude('org.apache', 'hadoop-core'), | |
exclude('org.apache', 'hadoop-lzo'), | |
exclude('com.hadoop', 'hadoop-lzo'), | |
exclude('org.apache.hadoop'), | |
exclude('org.apache.hbase'), | |
exclude('org.mortbay.jetty'), | |
exclude('storm', 'storm'), | |
] | |
scala_library(name = 'job', | |
dependencies = [ | |
pants('3rdparty/summingbird:summingbird'), | |
pants('src/thrift/com/twitter/ads/realtime/exprolluprt'), | |
pants('src/scala/com/twitter/summingbird_internal/source'), | |
pants('src/scala/com/twitter/summingbird_internal/bijection'), | |
pants('src/scala/com/twitter/ads/batch/experimental/amedina/summingbird/exp_rollup_rt/common'), | |
pants('src/scala/com/twitter/ads/batch/experimental/amedina/summingbird/exp_rollup_rt/util'), | |
pants('src/protobuf/com/twitter/data/proto'), | |
], | |
sources = globs('*.scala'), | |
) | |
jvm_binary(name = 'exprollup-rt-storm', | |
main = 'com.twitter.summingbird.Env', | |
dependencies = [ | |
pants(':job'), | |
], | |
deploy_excludes = ON_STORM_CLUSTER_EXCLUDES + LOGGING_EXCLUDES | |
) | |
jvm_binary(name = 'exprollup-rt-scalding', | |
main = 'com.twitter.summingbird.Env', | |
dependencies = [ | |
pants(':job'), | |
], | |
deploy_excludes = ON_HADOOP_CLUSTER_EXCLUDES + LOGGING_EXCLUDES | |
) | |
jvm_binary(name = 'exprollup-rt-repl', | |
main = 'com.twitter.summingbird.Env', | |
dependencies = [ | |
pants(':job'), | |
], | |
) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.twitter.ads.batch.realtime.exprollup_rt.job | |
import com.twitter.ads.logging.{AdEngagementLogEntry, AdShardRequestLogEntry} | |
import com.twitter.ads.eventstream.{ImpressionCallbackEvent} | |
import com.twitter.summingbird.Predef._ | |
import com.twitter.summingbird_internal.source._ | |
import com.twitter.ads.batch.realtime.exprollup_rt.common.ExpRollupRTConfig | |
import com.twitter.ads.batch.realtime.exprollup_rt.util.ExpRollupRTUtil | |
import com.twitter.ads.realtime.exprolluprt._ | |
import backtype.storm.Config | |
import com.twitter.summingbird.batch.BatchID | |
import com.twitter.summingbird.Predef.BatchID | |
import com.twitter.bijection.{ Bijection, Injection } | |
import com.twitter.summingbird_internal.bijection.BatchPairImplicits._ | |
class ExpRollupRTJob(env: Env) extends AbstractJob(env) { | |
val config = new ExpRollupRTConfig(env.args) | |
import config._ | |
val util = new ExpRollupRTUtil | |
import util._ | |
val reqOfflineStore = VersionedStore[ExpRollupRTReqKey, Long](reqDataset.hdfsPath) | |
val impOfflineStore = VersionedStore[ExpRollupRTImpCallBackKey, Long](impDataset.hdfsPath) | |
val engOfflineStore = VersionedStore[ExpRollupRTEngKey, Long](engDataset.hdfsPath) | |
/* | |
@see https://cgit.twitter.biz/science/tree/src/thrift/com/twitter/ads/eventstream/request_stream.thrift | |
*/ | |
val adShardRequests = AdShardRequestLogSource("exprollup_rt") | |
.map { | |
adReq: AdShardRequestLogEntry => { | |
val adRequestKey : ExpRollupRTReqKey = createRequestKey(adReq.getAdRequestLogEntry) | |
(adRequestKey, 1L) | |
} | |
} | |
.groupAndSumTo(CompoundStore(reqOfflineStore, reqOnlineStore)) | |
/* | |
@see https://cgit.twitter.biz/science/tree/src/thrift/com/twitter/ads/eventstream/impression_callback_stream.thrift#n7 | |
*/ | |
val impressionCallbackEvents = ImpressionCallbackEventSource("ExpRollupRT") | |
.map { | |
icbEvt : ImpressionCallbackEvent => { | |
val adImpressionKey : ExpRollupRTImpCallBackKey = createImpressionCallbackKey(icbEvt) | |
(adImpressionKey, 1L) | |
} | |
} | |
.groupAndSumTo(CompoundStore(impOfflineStore, impOnlineStore)) | |
/* | |
@see https://cgit.twitter.biz/science/tree/src/thrift/com/twitter/ads/eventstream/engagement_stream.thrift | |
*/ | |
val adEngagementEvents = AdEngagementLogSource("ExpRollupRT") | |
.map { | |
adEng: AdEngagementLogEntry => { | |
val adEngagementKey : ExpRollupRTEngKey = createEngagementKey(adEng) | |
(adEngagementKey, 1L) | |
} | |
} | |
.groupAndSumTo(CompoundStore(engOfflineStore, engOnlineStore)) | |
override def transformConfig(m: Map[String, AnyRef]): Map[String, AnyRef] = { | |
val basic = super.transformConfig(m) | |
basic ++ Map( | |
Config.TOPOLOGY_TEAM_NAME -> "Revenue Quality", | |
Config.TOPOLOGY_TEAM_EMAIL -> "amedina@twitter.com", | |
Config.TOPOLOGY_PROJECT_NAME -> "Experiment Rollups RT" | |
) | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
klin | |
zzhou | |
amedina |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
scala_library(name = 'util', | |
dependencies = [ | |
pants('src/thrift/com/twitter/ads/realtime/exprolluprt'), | |
], | |
sources = globs('*.scala'), | |
) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.twitter.ads.batch.realtime.exprollup_rt.util | |
import com.twitter.ads.logging.{AdEngagementLogEntry, AdRequestLogEntry} | |
import com.twitter.ads.eventstream.{ImpressionCallbackEvent} | |
//import com.twitter.adserver.{DisplayLocation, ImpressionDataNeededAtEngagementTime, GenderType} | |
import com.twitter.ads.realtime.exprolluprt._ | |
import scala.collection.immutable._ | |
import com.twitter.ads.realtime.exprolluprt._ | |
class ExpRollupRTUtil extends Serializable { | |
def createRequestKey(adReq: AdRequestLogEntry) : ExpRollupRTReqKey = { | |
val reqKey : ExpRollupRTReqKey = new ExpRollupRTReqKey | |
reqKey.time = adReq.getTime | |
reqKey.reqId = adReq.getRequestId | |
reqKey.clientId = adReq.getClientInfo.getClientId | |
reqKey.expIds = adReq.getExperimentKey.getExperimentIds | |
reqKey.gender = adReq.getGender | |
reqKey.countryRegion = adReq.getUserCountryRegion | |
reqKey | |
} | |
def createImpressionCallbackKey(icbEvt : ImpressionCallbackEvent) : ExpRollupRTImpCallBackKey = { | |
val impKey : ExpRollupRTImpCallBackKey = new ExpRollupRTImpCallBackKey | |
impKey.time = icbEvt.getImpressionData.getImpressionEpochTimeMilliSec | |
impKey.clientId = icbEvt.getImpressionData.getClientId | |
impKey.reqId = icbEvt.getImpressionData.getRequestId | |
impKey.impressionId = icbEvt.getImpressionData.getImpressionId | |
impKey.dispLoc = icbEvt.getImpressionData.getDisplayLocation | |
impKey.gender = icbEvt.getImpressionData.getGender | |
impKey.expIds = icbEvt.getImpressionData.getExperimentKey.getExperimentIds | |
impKey.countryRegion = icbEvt.getImpressionData.getUserCountryRegion | |
impKey | |
} | |
def createEngagementKey(adEng: AdEngagementLogEntry) : ExpRollupRTEngKey = { | |
val engKey : ExpRollupRTEngKey = new ExpRollupRTEngKey | |
engKey.time = adEng.getTime | |
engKey.clientId = adEng.getClientInfo.getClientId | |
engKey.engType = adEng.getEngagement.getType | |
engKey.reqId = adEng.getRequestId | |
engKey.impressionId = adEng.getImpressionId | |
engKey | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment