Skip to content

Instantly share code, notes, and snippets.

@meddulla
Last active January 18, 2020 13:25
Show Gist options
  • Save meddulla/3fcaa0b0aa91db7bee2a8ce88280f04a to your computer and use it in GitHub Desktop.
Save meddulla/3fcaa0b0aa91db7bee2a8ce88280f04a to your computer and use it in GitHub Desktop.
import org.apache.kafka.streams.scala.ImplicitConversions._
import Serdes.{String, Long, sessionWindowedSerde}
case class PageInfo(mean: Long, max: Long, startTs: Long, endTs: Long, totalDuration: Long, totalPages: Int,
clickRate: Long)
implicit val pageInfoFormat = Json.format[PageInfo] // PlayJson
// explicit manifests
val printer: JsValue => String = (x: JsValue) => Json.stringify(x)
val pageManifest: Manifest[PageInfo] = ManifestFactory.classType(classOf[PageInfo])
val pageInfoSerde = toSerde[PageInfo](pageManifest, pageInfoFormat, pageInfoFormat, printer)
val stringWinSerde = new WindowedSerdes.SessionWindowedSerde[String]()
// Streams
val builder: StreamsBuilder = new StreamsBuilder
val inputStream: KStream[String, Event] = builder.stream[String, Event](topic)
// Rekey topic by sessionId
val sessionsStream: KStream[String, Event] = inputStream
.filter((_, ev) => ev.eventData.custData.sessionId.isDefined)
.selectKey((_, ev) => ev.eventData.custData.sessionId.get)
sessionsStream
.peek((_, ev) => println("*****TBL*****" + ev))
.to(topicSessions)
val pageStatsStore: Materialized[Windowed[String], PageInfo, ByteArrayKeyValueStore] = Materialized.as[Windowed[String], PageInfo, ByteArrayKeyValueStore]("page_stats_store")
.withKeySerde(stringWinSerde).withValueSerde(pageInfoSerde)
val pageStats: KTable[Windowed[String], PageInfo] = sessionsStream
.filter((_ , ev) => ev.eventData.evData.pageType.isDefined)
.groupByKey
.windowedBy(SessionWindows.`with`(duration).grace(graceDuration))
.aggregate(initializer = mutable.MutableList[Long]())(
(_: String, newValue: Event, aggValue: mutable.MutableList[Long]) => {
val timestamp: Long = newValue.timestamp
println("GOT TMTSP " + timestamp)
aggValue += timestamp
aggValue
}, (_, a, b) => {
println("Merging sessions: GOT 2 lists " + a + "; " + b)
mergeLists(a, b)
})
.mapValues((ls: mutable.MutableList[Long]) => {
println("GOT MUTABLE LIST " + ls)
val sorted = ls.distinct.sorted // events may be duplicated due to merging...
val l = sorted.sliding(2).toList.map(xs => xs.tail.headOption.getOrElse(0L) - xs.headOption.getOrElse(0L))
val mean: Long = if (l.nonEmpty) (l.sum / l.length) else 0L
val max = l.max
val totalDuration = sorted.lastOption.getOrElse(0L) - sorted.headOption.getOrElse(0l)
val totalPages = sorted.size
val totalDurationSecs = if (totalDuration > 0L) totalDuration / 1000L / 60L else 0L
val clickRatePerMin: Long = if (totalPages > 0L && totalDurationSecs > 0L) totalPages / totalDurationSecs else 0L
val p = PageInfo(mean, max, sorted.head, sorted.last, totalDurationSecs, totalPages, clickRatePerMin)
println(p)
p
}, pageStatsStore)
//.suppress(Suppressed.untilTimeLimit(suppressTime,
// Suppressed.BufferConfig.maxRecords(suppressMaxRecords).emitEarlyWhenFull()))
.filter((_, pg) => pg.totalPages >= minPages)
pageStats.toStream
.print(Printed.toSysOut[Windowed[String], PageInfo].withLabel(pageStore))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment