Skip to content

Instantly share code, notes, and snippets.

@kbialek
Created February 24, 2018 17:44
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save kbialek/c8fe098e2040f1d22a2a1e6f08ef9fa5 to your computer and use it in GitHub Desktop.
Save kbialek/c8fe098e2040f1d22a2a1e6f08ef9fa5 to your computer and use it in GitHub Desktop.
Flink recent events example
package com.example.flink
import org.apache.flink.api.common.state.MapStateDescriptor
import org.apache.flink.streaming.api.functions.ProcessFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.util.Collector
object RecentEventsExample {
val RatingExpiration = 1 * 24 * 60 * 60 * 1000 // 1day in millis
type CompanyId = Long
type RatingId = Long
case class Rating(companyId: CompanyId, id: RatingId, value: Int, timestamp: Long)
case class Score(companyId: CompanyId, score: Double)
val scoringFunction = (ratings: java.lang.Iterable[Rating]) => {
// scoring magic goes here
0.0
}
val processFunction = new ProcessFunction[Rating, Score] {
lazy val state = getRuntimeContext.getMapState(new MapStateDescriptor("ratings", classOf[RatingId], classOf[Rating]))
override def processElement(rating: Rating, ctx: ProcessFunction[Rating, Score]#Context, out: Collector[Score]) = {
state.put(rating.id, rating)
ctx.timerService().registerEventTimeTimer(rating.timestamp + RatingExpiration)
out.collect(Score(rating.companyId, scoringFunction(state.values())))
}
override def onTimer(timestamp: Long, ctx: ProcessFunction[Rating, Score]#OnTimerContext, out: Collector[Score]) = {
var companyId: Option[CompanyId] = None
// evict expired ratings
val iter = state.iterator()
while (iter.hasNext) {
val rating = iter.next().getValue
companyId = Some(rating.companyId)
if (rating.timestamp < timestamp - RatingExpiration) {
iter.remove()
}
}
// emit recalculated score
companyId match {
case Some(id) => out.collect(Score(id, scoringFunction(state.values())))
case None =>
}
}
}
def apply(ratingStream: DataStream[Rating]): DataStream[Score] = {
ratingStream
.keyBy(_.companyId)
.process(processFunction)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment