Skip to content

Instantly share code, notes, and snippets.

@amedina
Created November 10, 2013 00:19
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save amedina/7391946 to your computer and use it in GitHub Desktop.
Save amedina/7391946 to your computer and use it in GitHub Desktop.
Summingbird Skeleton Exp Rollup Job
scala_library(name = 'exprollup_rt',
dependencies = [
pants('src/scala/com/twitter/ads/batch/experimental/amedina/summingbird/exp_rollup_rt/common'),
pants('src/scala/com/twitter/ads/batch/experimental/amedina/summingbird/exp_rollup_rt/job'),
pants('src/scala/com/twitter/ads/batch/experimental/amedina/summingbird/exp_rollup_rt/client'),
],
)
scala_library(name = 'client',
dependencies = [
pants('3rdparty/storehaus:storehaus-core'),
pants('src/thrift/com/twitter/ads/realtime/exprolluprt'),
pants('3rdparty/summingbird:summingbird-client'),
pants('src/scala/com/twitter/storehaus_internal/manhattan'),
pants('src/scala/com/twitter/ads/batch/experimental/amedina/summingbird/exp_rollup_rt/common'),
],
sources = globs('*.scala'),
provides = artifact(
org = 'com.twitter',
name = 'exprollup-rt-client',
repo = pants('build-support/twitter/ivy:internal'),
),
)
package com.twitter.ads.batch.realtime.exprollup_rt.client
import com.twitter.scalding.Args
import com.twitter.summingbird.store.ClientStore
import com.twitter.storehaus.ReadableStore
import com.twitter.storehaus_internal.manhattan.ManhattanStore
import com.twitter.ads.batch.realtime.exprollup_rt.common.ExpRollupRTConfig
class ExpRollupRTClient(val args: Args) {
val config = new ExpRollupRTConfig(args)
import config._
val manhattanStore = ManhattanStore.fromDataset(dataset)
val client: ReadableStore[Long, Long] =
ClientStore(manhattanStore)
}
# This target pulls in the summingbird "Batch" library, plus some
# code for serialization that we can use here at Twitter to make
# life easier. We also pull in a Manhattan reader.
scala_library(name = 'common',
dependencies = [
pants('3rdparty/algebird:algebird-core'),
pants('src/thrift/com/twitter/ads/realtime/exprolluprt'),
pants('3rdparty/summingbird:summingbird-core'),
pants('3rdparty/summingbird:summingbird-batch'),
pants('src/scala/com/twitter/summingbird_internal/bijection'),
pants('src/scala/com/twitter/storehaus_internal/manhattan'),
pants('src/scala/com/twitter/storehaus_internal/memcache')
],
sources = globs('*.scala'),
)
package com.twitter.ads.batch.realtime.exprollup_rt.common
import com.twitter.ads.realtime.exprolluprt._
import com.twitter.bijection.{ Bijection, Injection }
import com.twitter.scalding.Args
import com.twitter.summingbird.batch.{ Batcher, BatchID }
import com.twitter.storehaus_internal.manhattan.Dataset
import com.twitter.storehaus_internal.memcache.MemcacheStore
import com.twitter.bijection.thrift.CompactThriftCodec
import com.twitter.summingbird_internal.bijection.BatchPairImplicits._
import com.twitter.ads.adserver.CohortDimensions
class ExpRollupRTConfig(val args: Args) extends Serializable {
val hdfsPath = args.required("hdfs_path")
val reqHdfsPath = hdfsPath + "/requests"
val impHdfsPath = hdfsPath + "/impressions"
val engHdfsPath = hdfsPath + "/engagements"
val manhattanAppID = args.required("manhattan_id")
val requestDatasetName = args.required("req_dataset_name")
val impressionDatasetName = args.required("imp_dataset_name")
val engagementDatasetName = args.required("eng_dataset_name")
def reqKeyCodec: Injection[ExpRollupRTReqKey, Array[Byte]] =
CompactThriftCodec[ExpRollupRTReqKey]
implicit lazy val reqKeyValueCodec = reqKeyCodec
val reqDataset =
Dataset[ExpRollupRTReqKey, (BatchID, Long)](reqHdfsPath, manhattanAppID, requestDatasetName)
def impKeyCodec: Injection[ExpRollupRTImpCallBackKey, Array[Byte]] =
CompactThriftCodec[ExpRollupRTImpCallBackKey]
implicit lazy val impKeyValueCodec = impKeyCodec
val impDataset =
Dataset[ExpRollupRTImpCallBackKey, (BatchID, Long)](impHdfsPath, manhattanAppID, impressionDatasetName)
def engKeyCodec: Injection[ExpRollupRTEngKey, Array[Byte]] =
CompactThriftCodec[ExpRollupRTEngKey]
implicit lazy val engKeyValueCodec = engKeyCodec
val engDataset =
Dataset[ExpRollupRTEngKey, (BatchID, Long)](engHdfsPath, manhattanAppID, engagementDatasetName)
implicit val reqInjection: Injection[(ExpRollupRTReqKey, (BatchID, Long)), (Array[Byte], Array[Byte])] =
reqDataset.pairInjection
implicit val impInjection: Injection[(ExpRollupRTImpCallBackKey, (BatchID, Long)), (Array[Byte], Array[Byte])] =
impDataset.pairInjection
implicit val engInjection: Injection[(ExpRollupRTEngKey, (BatchID, Long)), (Array[Byte], Array[Byte])] =
engDataset.pairInjection
implicit val batcher = Batcher.ofHours(1)
//The onlineStore uses a shared set of memcache hosts under twemcache_summingbird
def reqOnlineStore = MemcacheStore.mergeable[(ExpRollupRTReqKey, BatchID),Long](requestDatasetName)
def impOnlineStore = MemcacheStore.mergeable[(ExpRollupRTImpCallBackKey, BatchID),Long](impressionDatasetName)
def engOnlineStore = MemcacheStore.mergeable[(ExpRollupRTEngKey, BatchID),Long](engagementDatasetName)
}
ON_HADOOP_CLUSTER_EXCLUDES = [
exclude('commons-httpclient','commons-httpclient'),
exclude('commons-net','commons-net'),
# There are pulled in by elephant-bird or cascading:
exclude('org.apache', 'hadoop-core'),
exclude('org.apache', 'hadoop-lzo'),
exclude('com.hadoop', 'hadoop-lzo'),
exclude('org.apache.hadoop'),
exclude('org.apache.hbase'),
exclude('org.apache.zookeeper','zookeeper'),
exclude('org.mortbay.jetty'),
]
LOGGING_EXCLUDES = [
exclude('log4j'),
exclude('org.slf4j'),
exclude('commons-logging', 'commons-logging'),
exclude('ch.qos.logback'),
]
ON_STORM_CLUSTER_EXCLUDES = [
#There are pulled in by elephant-bird or cascading:
exclude('org.apache', 'hadoop-core'),
exclude('org.apache', 'hadoop-lzo'),
exclude('com.hadoop', 'hadoop-lzo'),
exclude('org.apache.hadoop'),
exclude('org.apache.hbase'),
exclude('org.mortbay.jetty'),
exclude('storm', 'storm'),
]
scala_library(name = 'job',
dependencies = [
pants('3rdparty/summingbird:summingbird'),
pants('src/thrift/com/twitter/ads/realtime/exprolluprt'),
pants('src/scala/com/twitter/summingbird_internal/source'),
pants('src/scala/com/twitter/summingbird_internal/bijection'),
pants('src/scala/com/twitter/ads/batch/experimental/amedina/summingbird/exp_rollup_rt/common'),
pants('src/scala/com/twitter/ads/batch/experimental/amedina/summingbird/exp_rollup_rt/util'),
pants('src/protobuf/com/twitter/data/proto'),
],
sources = globs('*.scala'),
)
jvm_binary(name = 'exprollup-rt-storm',
main = 'com.twitter.summingbird.Env',
dependencies = [
pants(':job'),
],
deploy_excludes = ON_STORM_CLUSTER_EXCLUDES + LOGGING_EXCLUDES
)
jvm_binary(name = 'exprollup-rt-scalding',
main = 'com.twitter.summingbird.Env',
dependencies = [
pants(':job'),
],
deploy_excludes = ON_HADOOP_CLUSTER_EXCLUDES + LOGGING_EXCLUDES
)
jvm_binary(name = 'exprollup-rt-repl',
main = 'com.twitter.summingbird.Env',
dependencies = [
pants(':job'),
],
)
package com.twitter.ads.batch.realtime.exprollup_rt.job
import com.twitter.ads.logging.{AdEngagementLogEntry, AdShardRequestLogEntry}
import com.twitter.ads.eventstream.{ImpressionCallbackEvent}
import com.twitter.summingbird.Predef._
import com.twitter.summingbird_internal.source._
import com.twitter.ads.batch.realtime.exprollup_rt.common.ExpRollupRTConfig
import com.twitter.ads.batch.realtime.exprollup_rt.util.ExpRollupRTUtil
import com.twitter.ads.realtime.exprolluprt._
import backtype.storm.Config
import com.twitter.summingbird.batch.BatchID
import com.twitter.summingbird.Predef.BatchID
import com.twitter.bijection.{ Bijection, Injection }
import com.twitter.summingbird_internal.bijection.BatchPairImplicits._
class ExpRollupRTJob(env: Env) extends AbstractJob(env) {
val config = new ExpRollupRTConfig(env.args)
import config._
val util = new ExpRollupRTUtil
import util._
val reqOfflineStore = VersionedStore[ExpRollupRTReqKey, Long](reqDataset.hdfsPath)
val impOfflineStore = VersionedStore[ExpRollupRTImpCallBackKey, Long](impDataset.hdfsPath)
val engOfflineStore = VersionedStore[ExpRollupRTEngKey, Long](engDataset.hdfsPath)
/*
@see https://cgit.twitter.biz/science/tree/src/thrift/com/twitter/ads/eventstream/request_stream.thrift
*/
val adShardRequests = AdShardRequestLogSource("exprollup_rt")
.map {
adReq: AdShardRequestLogEntry => {
val adRequestKey : ExpRollupRTReqKey = createRequestKey(adReq.getAdRequestLogEntry)
(adRequestKey, 1L)
}
}
.groupAndSumTo(CompoundStore(reqOfflineStore, reqOnlineStore))
/*
@see https://cgit.twitter.biz/science/tree/src/thrift/com/twitter/ads/eventstream/impression_callback_stream.thrift#n7
*/
val impressionCallbackEvents = ImpressionCallbackEventSource("ExpRollupRT")
.map {
icbEvt : ImpressionCallbackEvent => {
val adImpressionKey : ExpRollupRTImpCallBackKey = createImpressionCallbackKey(icbEvt)
(adImpressionKey, 1L)
}
}
.groupAndSumTo(CompoundStore(impOfflineStore, impOnlineStore))
/*
@see https://cgit.twitter.biz/science/tree/src/thrift/com/twitter/ads/eventstream/engagement_stream.thrift
*/
val adEngagementEvents = AdEngagementLogSource("ExpRollupRT")
.map {
adEng: AdEngagementLogEntry => {
val adEngagementKey : ExpRollupRTEngKey = createEngagementKey(adEng)
(adEngagementKey, 1L)
}
}
.groupAndSumTo(CompoundStore(engOfflineStore, engOnlineStore))
override def transformConfig(m: Map[String, AnyRef]): Map[String, AnyRef] = {
val basic = super.transformConfig(m)
basic ++ Map(
Config.TOPOLOGY_TEAM_NAME -> "Revenue Quality",
Config.TOPOLOGY_TEAM_EMAIL -> "amedina@twitter.com",
Config.TOPOLOGY_PROJECT_NAME -> "Experiment Rollups RT"
)
}
}
klin
zzhou
amedina

Real-Time Experiment Rollups Implementation

scala_library(name = 'util',
dependencies = [
pants('src/thrift/com/twitter/ads/realtime/exprolluprt'),
],
sources = globs('*.scala'),
)
package com.twitter.ads.batch.realtime.exprollup_rt.util
import com.twitter.ads.logging.{AdEngagementLogEntry, AdRequestLogEntry}
import com.twitter.ads.eventstream.{ImpressionCallbackEvent}
//import com.twitter.adserver.{DisplayLocation, ImpressionDataNeededAtEngagementTime, GenderType}
import com.twitter.ads.realtime.exprolluprt._
import scala.collection.immutable._
import com.twitter.ads.realtime.exprolluprt._
class ExpRollupRTUtil extends Serializable {
def createRequestKey(adReq: AdRequestLogEntry) : ExpRollupRTReqKey = {
val reqKey : ExpRollupRTReqKey = new ExpRollupRTReqKey
reqKey.time = adReq.getTime
reqKey.reqId = adReq.getRequestId
reqKey.clientId = adReq.getClientInfo.getClientId
reqKey.expIds = adReq.getExperimentKey.getExperimentIds
reqKey.gender = adReq.getGender
reqKey.countryRegion = adReq.getUserCountryRegion
reqKey
}
def createImpressionCallbackKey(icbEvt : ImpressionCallbackEvent) : ExpRollupRTImpCallBackKey = {
val impKey : ExpRollupRTImpCallBackKey = new ExpRollupRTImpCallBackKey
impKey.time = icbEvt.getImpressionData.getImpressionEpochTimeMilliSec
impKey.clientId = icbEvt.getImpressionData.getClientId
impKey.reqId = icbEvt.getImpressionData.getRequestId
impKey.impressionId = icbEvt.getImpressionData.getImpressionId
impKey.dispLoc = icbEvt.getImpressionData.getDisplayLocation
impKey.gender = icbEvt.getImpressionData.getGender
impKey.expIds = icbEvt.getImpressionData.getExperimentKey.getExperimentIds
impKey.countryRegion = icbEvt.getImpressionData.getUserCountryRegion
impKey
}
def createEngagementKey(adEng: AdEngagementLogEntry) : ExpRollupRTEngKey = {
val engKey : ExpRollupRTEngKey = new ExpRollupRTEngKey
engKey.time = adEng.getTime
engKey.clientId = adEng.getClientInfo.getClientId
engKey.engType = adEng.getEngagement.getType
engKey.reqId = adEng.getRequestId
engKey.impressionId = adEng.getImpressionId
engKey
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment