Created
February 24, 2018 17:44
-
-
Save kbialek/c8fe098e2040f1d22a2a1e6f08ef9fa5 to your computer and use it in GitHub Desktop.
Flink recent events example
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.example.flink | |
import org.apache.flink.api.common.state.MapStateDescriptor | |
import org.apache.flink.streaming.api.functions.ProcessFunction | |
import org.apache.flink.streaming.api.scala._ | |
import org.apache.flink.util.Collector | |
object RecentEventsExample { | |
val RatingExpiration = 1 * 24 * 60 * 60 * 1000 // 1day in millis | |
type CompanyId = Long | |
type RatingId = Long | |
case class Rating(companyId: CompanyId, id: RatingId, value: Int, timestamp: Long) | |
case class Score(companyId: CompanyId, score: Double) | |
val scoringFunction = (ratings: java.lang.Iterable[Rating]) => { | |
// scoring magic goes here | |
0.0 | |
} | |
val processFunction = new ProcessFunction[Rating, Score] { | |
lazy val state = getRuntimeContext.getMapState(new MapStateDescriptor("ratings", classOf[RatingId], classOf[Rating])) | |
override def processElement(rating: Rating, ctx: ProcessFunction[Rating, Score]#Context, out: Collector[Score]) = { | |
state.put(rating.id, rating) | |
ctx.timerService().registerEventTimeTimer(rating.timestamp + RatingExpiration) | |
out.collect(Score(rating.companyId, scoringFunction(state.values()))) | |
} | |
override def onTimer(timestamp: Long, ctx: ProcessFunction[Rating, Score]#OnTimerContext, out: Collector[Score]) = { | |
var companyId: Option[CompanyId] = None | |
// evict expired ratings | |
val iter = state.iterator() | |
while (iter.hasNext) { | |
val rating = iter.next().getValue | |
companyId = Some(rating.companyId) | |
if (rating.timestamp < timestamp - RatingExpiration) { | |
iter.remove() | |
} | |
} | |
// emit recalculated score | |
companyId match { | |
case Some(id) => out.collect(Score(id, scoringFunction(state.values()))) | |
case None => | |
} | |
} | |
} | |
def apply(ratingStream: DataStream[Rating]): DataStream[Score] = { | |
ratingStream | |
.keyBy(_.companyId) | |
.process(processFunction) | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment