Skip to content

Instantly share code, notes, and snippets.

@fr3dch3n fr3dch3n/Main.scala
Created Sep 29, 2018

Embed
What would you like to do?
NeverCodeAlone - Realtime Aggregation des Otto.de Event Streams
object Main extends LazyLogging {
def main(args: Array[String]): Unit = {
validateMandatoryParameters()
val groupId = sys.env("GROUP_ID")
val brokerList = sys.env("KAFKA_URL")
val sourceTopic = sys.env("SOURCE_TOPIC")
val metricsTopic = sys.env("METRICS_TOPIC")
logger.info("Starting up service hopper...")
val running = new AtomicBoolean(true)
val latch = new CountDownLatch(1)
val consumer = new KafkaConsumer[String, String](KafkaConfig.consumerProperties(brokerList, groupId, false))
val producer = new KafkaProducer[String, String](KafkaConfig.producerProperties(brokerList))
consumer.assign(Set(new TopicPartition(sourceTopic, KafkaConfig.Partition)))
sys.addShutdownHook {
logger.info("Received Shutdown signal. Setting running to false...")
running.set(false)
latch.await()
}
logger.info("Entering poll loop...")
var pageViewMap = Map.empty[Long, Int]
var currentTime = 0L
while (running.get()) {
consumer
.poll(100)
.flatMap(record => Click.parse(record.value()).map(click => (click, record.offset())))
.foreach { case (click, offset) => {
if (click.clickType == Page) {
val time = click.epochTimestamp
val count = pageViewMap.getOrElse(time, 0)
pageViewMap = pageViewMap + (time -> (count + 1))
if (time != currentTime) {
val pageViewsOfSecond = pageViewMap.getOrElse(currentTime, 0)
producer.send(
new ProducerRecord[String, String](metricsTopic, KafkaConfig.Partition, currentTime.toString, pageViewsOfSecond.toString), new Callback {
override def onCompletion(metadata: RecordMetadata, exception: Exception): Unit = {
Option(exception) match {
case None => consumer.commitAsync(Map(new TopicPartition(sourceTopic, KafkaConfig.Partition) -> offset))
case Some(e) => logger.error(e.getMessage)
}
}
})
logger.info(s"sent $currentTime of $pageViewsOfSecond")
pageViewMap = pageViewMap - currentTime
currentTime = time
}
}
logger.info(s"$click")
}
}
//logger.info(s"pageViews per second: $pageViewMap")
}
logger.info("Left poll loop")
consumer.close()
latch.countDown()
logger.info("Finished main")
}
private def validateMandatoryParameters(): Unit = {
List(
"APP_COMMIT",
"CERT_PASS",
"CERT_PATH",
"ENV",
"GROUP_ID",
"KAFKA_URL",
"KAFKA_KEYSTORE_PATH",
"KAFKA_TRUSTSTORE_PATH",
"KEYSTORE_PW",
"SOURCE_TOPIC",
"STATE_TOPIC",
"METRICS_TOPIC"
).foreach(param => require(sys.env.contains(param), s"Environment variable is missing: $param"))
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.