Skip to content

Instantly share code, notes, and snippets.

@koushikmln
Last active June 2, 2018 08:22
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save koushikmln/2eb5dafb2cc7c723883623b6eadb717c to your computer and use it in GitHub Desktop.
Save koushikmln/2eb5dafb2cc7c723883623b6eadb717c to your computer and use it in GitHub Desktop.
val conf = ConfigFactory.load
val envProps: Config = conf.getConfig(args(0))
val sparkConf = new SparkConf().setMaster("yarn").setAppName("SiteTraffic")
val streamingContext = new StreamingContext(sparkConf, Seconds(envProps.getInt("window")))
val broadcastConfig = streamingContext.sparkContext.broadcast(envProps)
val topicsSet = Set("retail_logs")
val now = Calendar.getInstance().getTime()
val timestamp = streamingContext.sparkContext.broadcast(now)
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> envProps.getString("bootstrap.server"),
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "1",
"auto.offset.reset" -> "latest",
"enable.auto.commit" -> (false: java.lang.Boolean)
)
val logData: DStream[String] =KafkaUtils.createDirectStream[String, String](
streamingContext,
PreferConsistent,
Subscribe[String, String](topicsSet, kafkaParams)
).map(record => record.value)
val countryList = logData.map(line => {
val json: Option[Any] = JSON.parseFull(line)
val map = json.get.asInstanceOf[Map[String, Any]]
val geoIpMap = map.get("geoip").get.asInstanceOf[Map[String, Any]]
val country = geoIpMap.get("country_name").getOrElse("ALIEN").asInstanceOf[String]
val timestamp = map.get("rounded_timestamp").get.asInstanceOf[String]
((timestamp , country), 1)
}).reduceByKey(_ + _)
countryList.foreachRDD(countries =>{
countries.foreach(country =>{
insertOrUpdateMetrics(country._1._1, country._1._2, country._2, broadcastConfig.value)
})
})
streamingContext.start()
streamingContext.awaitTermination()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment