Instantly share code, notes, and snippets.
Last active
August 10, 2017 20:20
-
Save ottomata/5020480e15eae6fe80c54e2aa4c80b78 to your computer and use it in GitHub Desktop.
Version of Jospeh's BannerImpressionStream that uses Tranquility to produce directly to Druid rather than Kafka. Doesn't work: https://phabricator.wikimedia.org/T168550#3517248
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package org.wikimedia.analytics.refinery.job | |
import com.netaporter.uri.Uri | |
import kafka.serializer.StringDecoder | |
import org.apache.spark.streaming.kafka.KafkaUtils | |
import org.apache.spark.{SparkConf, SparkContext} | |
import org.apache.spark.streaming.{Seconds, StreamingContext} | |
import org.wikimedia.analytics.refinery.core.Webrequest | |
import scopt.OptionParser | |
import org.json4s._ | |
import org.json4s.jackson.JsonMethods._ | |
import org.json4s.JsonDSL._ | |
import com.metamx.tranquility.spark.{BeamFactory, BeamRDD} | |
import com.metamx.tranquility.beam.{Beam, ClusteredBeamTuning} | |
import org.apache.curator.framework.CuratorFrameworkFactory | |
import org.apache.curator.retry.BoundedExponentialBackoffRetry | |
import org.joda.time.format.DateTimeFormatter | |
import com.github.nscala_time.time.Imports._ | |
import com.metamx.common.Granularity | |
import io.druid.granularity.QueryGranularities | |
import com.metamx.tranquility.druid.{DruidBeams, DruidEnvironment, DruidLocation, DruidRollup, SpecificDruidDimensions} | |
import io.druid.query.aggregation.{DoubleSumAggregatorFactory, LongSumAggregatorFactory} | |
// Add this import to your Spark job to be able to propagate events from any RDD to Druid | |
import com.metamx.tranquility.spark.BeamRDD._ | |
object BannerImpressionBeamFactory | |
{ | |
lazy val BeamInstance: Beam[BannerImpression] = { | |
// Tranquility uses ZooKeeper (through Curator framework) for coordination. | |
val curator = CuratorFrameworkFactory.newClient( | |
"druid1001.eqiad.wmnet:2181", | |
new BoundedExponentialBackoffRetry(100, 3000, 5) | |
) | |
curator.start() | |
val indexService = "druid/overlord" // Your overlord's druid.service, with slashes replaced by colons. | |
val discoveryPath = "discovery" // Your overlord's druid.discovery.curator.path | |
val dataSource = "banner_activity_minutely_otto1" | |
val dimensions = IndexedSeq( | |
"campaign", | |
"banner", | |
"project", | |
"uselang", | |
"bucket", | |
"anonymous", | |
"status_code", | |
"country", | |
"device", | |
"sample_rate" | |
) | |
val aggregators = Seq( | |
new LongSumAggregatorFactory("request_count", "request_count"), | |
new DoubleSumAggregatorFactory("normalized_request_count", "normalized_request_count") | |
) | |
// Expects simpleEvent.timestamp to return a Joda DateTime object. | |
DruidBeams | |
.builder((impression: BannerImpression) => impression.timestamp) | |
.curator(curator) | |
.discoveryPath(discoveryPath) | |
.location(DruidLocation(DruidEnvironment(indexService), dataSource)) | |
.rollup(DruidRollup(SpecificDruidDimensions(dimensions), aggregators, QueryGranularities.MINUTE)) | |
.tuning( | |
ClusteredBeamTuning( | |
segmentGranularity = Granularity.DAY, | |
windowPeriod = new Period("PT10M"), | |
partitions = 1, | |
replicants = 3 | |
) | |
) | |
.buildBeam() | |
} | |
} | |
class BannerImpressionBeamFactory extends BeamFactory[BannerImpression] | |
{ // Return a singleton, so the same connection is shared across all tasks in the same JVM. | |
def makeBeam: Beam[BannerImpression] = { | |
BannerImpressionBeamFactory.BeamInstance | |
} | |
} | |
case class BannerImpression( | |
dt: String, | |
campaign: String, | |
banner: String, | |
project: String, | |
uselang: String, | |
bucket: String, | |
anonymous: String, | |
status_code: String, | |
country: String, | |
device: String, | |
sample_rate: Double, | |
request_count: Int, | |
normalized_request_count: Double | |
) { | |
val iso8601Formatter: DateTimeFormatter = DateTimeFormat.forPattern("yyyy-MM-dd'T'HH:mm:ss") | |
val timestamp: DateTime = iso8601Formatter.parseDateTime(dt) | |
} | |
object BannerImpressionsStream { | |
def run( | |
@transient sc: SparkContext, | |
kafkaBrokers:String, | |
kafkaInputTopics: String, | |
kafkaOutputTopic: String, | |
batchDurationSeconds: Int, | |
checkpointDirectory: String, | |
noCheckpoint: Boolean | |
): Unit = { | |
def newStreamingContext() = { | |
val ssc = new StreamingContext(sc, Seconds(batchDurationSeconds.toLong)) | |
ssc.checkpoint(checkpointDirectory) | |
val kafkaInputTopicsSet = kafkaInputTopics.split(",").toSet | |
val KafkaInputParameters = Map[String, String]("metadata.broker.list" -> kafkaBrokers) | |
// Get kafka batches from input topics | |
val messageStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder]( | |
ssc, | |
KafkaInputParameters, | |
kafkaInputTopicsSet | |
) | |
// Compute banner oriented filtering / conversion / aggregation | |
val bannerStream = messageStream. | |
// Extract the JSON message from the Kafka (Key, Value) message. | |
map { case (_, str) => parse(str) }. | |
filter(json => { | |
(json \ "uri_path").values.asInstanceOf[String] == "/beacon/impression" && | |
(json \ "uri_query").values.asInstanceOf[String].contains("debug=false") && | |
!Webrequest.getInstance().isSpider((json \ "user_agent").values.asInstanceOf[String]) | |
}). | |
map(json => { | |
val uri_query = (json \\ "uri_query").values.asInstanceOf[String] | |
val uri: Uri = Uri.parse("http://bla.org/woo/" + uri_query) | |
val minuteTs = (json \ "dt").values.asInstanceOf[String].replaceAll(":\\d\\d$", ":00") | |
val params: Map[String, Seq[String]] = uri.query.paramMap | |
("dt" -> minuteTs) ~ | |
("campaign" -> params.getOrElse("campaign", List.empty[String]).headOption) ~ | |
("banner" -> params.getOrElse("banner", List.empty[String]).headOption) ~ | |
("project" -> params.getOrElse("project", List.empty[String]).headOption) ~ | |
("uselang" -> params.getOrElse("uselang", List.empty[String]).headOption) ~ | |
("bucket" -> params.getOrElse("bucket", List.empty[String]).headOption) ~ | |
("anonymous" -> (params.getOrElse("anonymous", List.empty[String]).headOption == Some("true"))) ~ | |
("status_code" -> params.getOrElse("statusCode", List.empty[String]).headOption) ~ | |
("country" -> params.getOrElse("country", List.empty[String]).headOption) ~ | |
("device" -> params.getOrElse("device", List.empty[String]).headOption) ~ | |
("sample_rate" -> params.getOrElse("recordImpressionSampleRate", List.empty[String]).headOption.map(_.toDouble)) | |
}). | |
countByValue(). | |
map { case (json, count) => | |
val jsonSampleRate = json \ "sample_rate" | |
json merge ( | |
("request_count" -> count) ~ | |
("normalized_request_count" -> { | |
if (jsonSampleRate != JNothing) Some(count / jsonSampleRate.values.asInstanceOf[Double]) | |
else None | |
}) | |
) | |
}. | |
map(j => BannerImpression( | |
(j \ "dt").values.asInstanceOf[String], | |
(j \ "campaign").values.asInstanceOf[String], | |
(j \ "banner").values.asInstanceOf[String], | |
(j \ "project").values.asInstanceOf[String], | |
(j \ "uselang").values.asInstanceOf[String], | |
(j \ "bucket").values.asInstanceOf[String], | |
(j \ "anonymous").values.asInstanceOf[String], | |
(j \ "status_code").values.asInstanceOf[String], | |
(j \ "country").values.asInstanceOf[String], | |
(j \ "device").values.asInstanceOf[String], | |
(j \ "sample_rate").values.asInstanceOf[Double], | |
(j \ "request_count").values.asInstanceOf[Int], | |
(j \ "normalized_request_count").values.asInstanceOf[Double] | |
)) | |
// propagate each RDD to druid | |
bannerStream.foreachRDD(rdd => rdd.propagate(new BannerImpressionBeamFactory)) | |
ssc | |
} | |
val context = { | |
if (noCheckpoint) newStreamingContext() | |
else StreamingContext.getOrCreate(checkpointDirectory, newStreamingContext) | |
} | |
// Start the context | |
context.start() | |
context.awaitTermination() | |
} | |
/** | |
* Config class for CLI argument parser using scopt | |
*/ | |
case class Params( | |
kafkaBrokers: String = Seq("12", "13", "14", "18", "20", "22").map("kafka10" + _ + ".eqiad.wmnet:9092").mkString(","), | |
kafkaInputTopics: String = "webrequest_text", | |
kafkaOutputTopic: String = "test_banner_impressions_otto1", | |
batchDurationSecs: Int = 10, | |
checkpointDirectory: String = "hdfs://analytics-hadoop/tmp/spark/banner_impressions_stream_checkpoint_otto1", | |
noCheckpoint: Boolean = false | |
) | |
/** | |
* Define the command line options parser | |
*/ | |
val argsParser = new OptionParser[Params]("Banner Impressions Stream") { | |
head("Banner Impressions Stream", "") | |
note( "Extract banner impressions data from kafka webrequest stream and write it back to kafka") | |
help("help") text "Prints this usage text" | |
opt[String]('k', "kafka-brokers") optional() valueName "<broker1,...,brokerN>" action { | |
(x, p) => p.copy(kafkaBrokers = x) | |
} text "Kafka brokers to consume from. Defaults to kafka10[12|14|18|20|22].eqiad.wmnet:9092" | |
opt[String]('i', "kafka-input-topics") optional() valueName "<topic1,...,topicK>" action { | |
(x, p) => p.copy(kafkaInputTopics = x) | |
} text "Input topics to consume. Defaults to webrequest_text" | |
opt[String]('o', "kafka-output-topic") optional() valueName "<topic>" action { | |
(x, p) => p.copy(kafkaOutputTopic = x) | |
} text "Output topic to write to. Defaults to test_banner_impressions_joal" | |
opt[Int]("batch-duration-seconds") optional() action { | |
(x, p) => p.copy(batchDurationSecs = x) | |
} text "Batch duration in seconds. Defaults to 10." | |
opt[String]("checkpoint-dir") optional() valueName "<path>" action { | |
(x, p) => p.copy(checkpointDirectory = if (x.endsWith("/")) x else x + "/") | |
} text ("Temporary directory for check-pointing streaming job.\n\t" + | |
"Defaults to hdfs://analytics-hadoop/tmp/spark/banner_impressions_stream_checkpoint") | |
opt[Unit]("no-checkpoint") optional() action { | |
(_, p) => p.copy(noCheckpoint = true) | |
} text "Force NOT using checkpoint if exists (wipes existing checkpoint directory if any)." | |
} | |
def main(args: Array[String]) { | |
argsParser.parse(args, Params()) match { | |
case Some(params) => | |
// Initial setup - Spark, SQLContext | |
val conf = new SparkConf() | |
.setAppName("BannerImpressionsStream") | |
val sc = new SparkContext(conf) | |
run( | |
sc, | |
params.kafkaBrokers, | |
params.kafkaInputTopics, | |
params.kafkaOutputTopic, | |
params.batchDurationSecs, | |
params.checkpointDirectory, | |
params.noCheckpoint | |
) | |
case None => sys.exit(1) | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment