Skip to content

Instantly share code, notes, and snippets.

@ottomata
Last active August 10, 2017 20:20
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save ottomata/5020480e15eae6fe80c54e2aa4c80b78 to your computer and use it in GitHub Desktop.
Save ottomata/5020480e15eae6fe80c54e2aa4c80b78 to your computer and use it in GitHub Desktop.
Version of Jospeh's BannerImpressionStream that uses Tranquility to produce directly to Druid rather than Kafka. Doesn't work: https://phabricator.wikimedia.org/T168550#3517248
package org.wikimedia.analytics.refinery.job
import com.netaporter.uri.Uri
import kafka.serializer.StringDecoder
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.wikimedia.analytics.refinery.core.Webrequest
import scopt.OptionParser
import org.json4s._
import org.json4s.jackson.JsonMethods._
import org.json4s.JsonDSL._
import com.metamx.tranquility.spark.{BeamFactory, BeamRDD}
import com.metamx.tranquility.beam.{Beam, ClusteredBeamTuning}
import org.apache.curator.framework.CuratorFrameworkFactory
import org.apache.curator.retry.BoundedExponentialBackoffRetry
import org.joda.time.format.DateTimeFormatter
import com.github.nscala_time.time.Imports._
import com.metamx.common.Granularity
import io.druid.granularity.QueryGranularities
import com.metamx.tranquility.druid.{DruidBeams, DruidEnvironment, DruidLocation, DruidRollup, SpecificDruidDimensions}
import io.druid.query.aggregation.{DoubleSumAggregatorFactory, LongSumAggregatorFactory}
// Add this import to your Spark job to be able to propagate events from any RDD to Druid
import com.metamx.tranquility.spark.BeamRDD._
object BannerImpressionBeamFactory
{
lazy val BeamInstance: Beam[BannerImpression] = {
// Tranquility uses ZooKeeper (through Curator framework) for coordination.
val curator = CuratorFrameworkFactory.newClient(
"druid1001.eqiad.wmnet:2181",
new BoundedExponentialBackoffRetry(100, 3000, 5)
)
curator.start()
val indexService = "druid/overlord" // Your overlord's druid.service, with slashes replaced by colons.
val discoveryPath = "discovery" // Your overlord's druid.discovery.curator.path
val dataSource = "banner_activity_minutely_otto1"
val dimensions = IndexedSeq(
"campaign",
"banner",
"project",
"uselang",
"bucket",
"anonymous",
"status_code",
"country",
"device",
"sample_rate"
)
val aggregators = Seq(
new LongSumAggregatorFactory("request_count", "request_count"),
new DoubleSumAggregatorFactory("normalized_request_count", "normalized_request_count")
)
// Expects simpleEvent.timestamp to return a Joda DateTime object.
DruidBeams
.builder((impression: BannerImpression) => impression.timestamp)
.curator(curator)
.discoveryPath(discoveryPath)
.location(DruidLocation(DruidEnvironment(indexService), dataSource))
.rollup(DruidRollup(SpecificDruidDimensions(dimensions), aggregators, QueryGranularities.MINUTE))
.tuning(
ClusteredBeamTuning(
segmentGranularity = Granularity.DAY,
windowPeriod = new Period("PT10M"),
partitions = 1,
replicants = 3
)
)
.buildBeam()
}
}
class BannerImpressionBeamFactory extends BeamFactory[BannerImpression]
{ // Return a singleton, so the same connection is shared across all tasks in the same JVM.
def makeBeam: Beam[BannerImpression] = {
BannerImpressionBeamFactory.BeamInstance
}
}
case class BannerImpression(
dt: String,
campaign: String,
banner: String,
project: String,
uselang: String,
bucket: String,
anonymous: String,
status_code: String,
country: String,
device: String,
sample_rate: Double,
request_count: Int,
normalized_request_count: Double
) {
val iso8601Formatter: DateTimeFormatter = DateTimeFormat.forPattern("yyyy-MM-dd'T'HH:mm:ss")
val timestamp: DateTime = iso8601Formatter.parseDateTime(dt)
}
object BannerImpressionsStream {
def run(
@transient sc: SparkContext,
kafkaBrokers:String,
kafkaInputTopics: String,
kafkaOutputTopic: String,
batchDurationSeconds: Int,
checkpointDirectory: String,
noCheckpoint: Boolean
): Unit = {
def newStreamingContext() = {
val ssc = new StreamingContext(sc, Seconds(batchDurationSeconds.toLong))
ssc.checkpoint(checkpointDirectory)
val kafkaInputTopicsSet = kafkaInputTopics.split(",").toSet
val KafkaInputParameters = Map[String, String]("metadata.broker.list" -> kafkaBrokers)
// Get kafka batches from input topics
val messageStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
ssc,
KafkaInputParameters,
kafkaInputTopicsSet
)
// Compute banner oriented filtering / conversion / aggregation
val bannerStream = messageStream.
// Extract the JSON message from the Kafka (Key, Value) message.
map { case (_, str) => parse(str) }.
filter(json => {
(json \ "uri_path").values.asInstanceOf[String] == "/beacon/impression" &&
(json \ "uri_query").values.asInstanceOf[String].contains("debug=false") &&
!Webrequest.getInstance().isSpider((json \ "user_agent").values.asInstanceOf[String])
}).
map(json => {
val uri_query = (json \\ "uri_query").values.asInstanceOf[String]
val uri: Uri = Uri.parse("http://bla.org/woo/" + uri_query)
val minuteTs = (json \ "dt").values.asInstanceOf[String].replaceAll(":\\d\\d$", ":00")
val params: Map[String, Seq[String]] = uri.query.paramMap
("dt" -> minuteTs) ~
("campaign" -> params.getOrElse("campaign", List.empty[String]).headOption) ~
("banner" -> params.getOrElse("banner", List.empty[String]).headOption) ~
("project" -> params.getOrElse("project", List.empty[String]).headOption) ~
("uselang" -> params.getOrElse("uselang", List.empty[String]).headOption) ~
("bucket" -> params.getOrElse("bucket", List.empty[String]).headOption) ~
("anonymous" -> (params.getOrElse("anonymous", List.empty[String]).headOption == Some("true"))) ~
("status_code" -> params.getOrElse("statusCode", List.empty[String]).headOption) ~
("country" -> params.getOrElse("country", List.empty[String]).headOption) ~
("device" -> params.getOrElse("device", List.empty[String]).headOption) ~
("sample_rate" -> params.getOrElse("recordImpressionSampleRate", List.empty[String]).headOption.map(_.toDouble))
}).
countByValue().
map { case (json, count) =>
val jsonSampleRate = json \ "sample_rate"
json merge (
("request_count" -> count) ~
("normalized_request_count" -> {
if (jsonSampleRate != JNothing) Some(count / jsonSampleRate.values.asInstanceOf[Double])
else None
})
)
}.
map(j => BannerImpression(
(j \ "dt").values.asInstanceOf[String],
(j \ "campaign").values.asInstanceOf[String],
(j \ "banner").values.asInstanceOf[String],
(j \ "project").values.asInstanceOf[String],
(j \ "uselang").values.asInstanceOf[String],
(j \ "bucket").values.asInstanceOf[String],
(j \ "anonymous").values.asInstanceOf[String],
(j \ "status_code").values.asInstanceOf[String],
(j \ "country").values.asInstanceOf[String],
(j \ "device").values.asInstanceOf[String],
(j \ "sample_rate").values.asInstanceOf[Double],
(j \ "request_count").values.asInstanceOf[Int],
(j \ "normalized_request_count").values.asInstanceOf[Double]
))
// propagate each RDD to druid
bannerStream.foreachRDD(rdd => rdd.propagate(new BannerImpressionBeamFactory))
ssc
}
val context = {
if (noCheckpoint) newStreamingContext()
else StreamingContext.getOrCreate(checkpointDirectory, newStreamingContext)
}
// Start the context
context.start()
context.awaitTermination()
}
/**
* Config class for CLI argument parser using scopt
*/
case class Params(
kafkaBrokers: String = Seq("12", "13", "14", "18", "20", "22").map("kafka10" + _ + ".eqiad.wmnet:9092").mkString(","),
kafkaInputTopics: String = "webrequest_text",
kafkaOutputTopic: String = "test_banner_impressions_otto1",
batchDurationSecs: Int = 10,
checkpointDirectory: String = "hdfs://analytics-hadoop/tmp/spark/banner_impressions_stream_checkpoint_otto1",
noCheckpoint: Boolean = false
)
/**
* Define the command line options parser
*/
val argsParser = new OptionParser[Params]("Banner Impressions Stream") {
head("Banner Impressions Stream", "")
note( "Extract banner impressions data from kafka webrequest stream and write it back to kafka")
help("help") text "Prints this usage text"
opt[String]('k', "kafka-brokers") optional() valueName "<broker1,...,brokerN>" action {
(x, p) => p.copy(kafkaBrokers = x)
} text "Kafka brokers to consume from. Defaults to kafka10[12|14|18|20|22].eqiad.wmnet:9092"
opt[String]('i', "kafka-input-topics") optional() valueName "<topic1,...,topicK>" action {
(x, p) => p.copy(kafkaInputTopics = x)
} text "Input topics to consume. Defaults to webrequest_text"
opt[String]('o', "kafka-output-topic") optional() valueName "<topic>" action {
(x, p) => p.copy(kafkaOutputTopic = x)
} text "Output topic to write to. Defaults to test_banner_impressions_joal"
opt[Int]("batch-duration-seconds") optional() action {
(x, p) => p.copy(batchDurationSecs = x)
} text "Batch duration in seconds. Defaults to 10."
opt[String]("checkpoint-dir") optional() valueName "<path>" action {
(x, p) => p.copy(checkpointDirectory = if (x.endsWith("/")) x else x + "/")
} text ("Temporary directory for check-pointing streaming job.\n\t" +
"Defaults to hdfs://analytics-hadoop/tmp/spark/banner_impressions_stream_checkpoint")
opt[Unit]("no-checkpoint") optional() action {
(_, p) => p.copy(noCheckpoint = true)
} text "Force NOT using checkpoint if exists (wipes existing checkpoint directory if any)."
}
def main(args: Array[String]) {
argsParser.parse(args, Params()) match {
case Some(params) =>
// Initial setup - Spark, SQLContext
val conf = new SparkConf()
.setAppName("BannerImpressionsStream")
val sc = new SparkContext(conf)
run(
sc,
params.kafkaBrokers,
params.kafkaInputTopics,
params.kafkaOutputTopic,
params.batchDurationSecs,
params.checkpointDirectory,
params.noCheckpoint
)
case None => sys.exit(1)
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment