Skip to content

Instantly share code, notes, and snippets.

@HeartSaVioR
Last active November 6, 2022 20:47
Show Gist options
  • Save HeartSaVioR/1d865b1a444af1ef7cae201bbdb196b0 to your computer and use it in GitHub Desktop.
Save HeartSaVioR/1d865b1a444af1ef7cae201bbdb196b0 to your computer and use it in GitHub Desktop.
[Flink DataStream API] The example of custom type of session window (allow logout event to close session, along with allowing inactivity)
package net.heartsavior.flink
import java.util
import java.util.{Calendar, Collections}
import org.apache.flink.api.common.ExecutionConfig
import org.apache.flink.api.common.typeutils.TypeSerializer
import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton
import org.apache.flink.api.scala._
import org.apache.flink.core.memory.{DataInputView, DataOutputView}
import org.apache.flink.streaming.api.{TimeCharacteristic, environment}
import org.apache.flink.streaming.api.functions.source.RichSourceFunction
import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.scala.function.ProcessWindowFunction
import org.apache.flink.streaming.api.scala.{DataStream, OutputTag, StreamExecutionEnvironment}
import org.apache.flink.streaming.api.watermark.Watermark
import org.apache.flink.streaming.api.windowing.assigners.{MergingWindowAssigner, WindowAssigner}
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.triggers.{EventTimeTrigger, Trigger}
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector
import scala.collection.mutable
import scala.util.Random
object CustomSessionWindowExample {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.getCheckpointConfig.setCheckpointInterval(10 * 1000)
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
env.getConfig.setAutoWatermarkInterval(1000L)
val events: DataStream[GameEvents] = env
.addSource(new GameEventSource)
.setParallelism(1)
// we can let our source to set timestamp and emit watermark, but worth to know
.assignTimestampsAndWatermarks(new GameTimeAssigner)
val incompleteGameSessions = events
.keyBy(_.userId)
.window(new CustomSessionWindowAssigner)
.process(new FindIncompleteGamesFunction)
incompleteGameSessions.print("incomplete-games")
incompleteGameSessions
.getSideOutput(new OutputTag[GameEvents]("events-before-login"))
.printToErr("events-before-login")
env.execute("Find incomplete game sessions from game events")
}
}
case class GameEvents(userId: Long, eventType: String, gameSessionId: Option[Long], timestamp: Long)
case class IncompleteGameSession(userId: Long, gameSessionId: Long)
object GameEventTypes {
val LOG_IN = "login"
val GAME_START = "game_start"
val GAME_END = "game_end"
val LOG_OUT = "logout"
}
class GameTimeAssigner extends BoundedOutOfOrdernessTimestampExtractor[GameEvents](Time.seconds(5)) {
override def extractTimestamp(t: GameEvents): Long = t.timestamp
}
class FindIncompleteGamesFunction extends ProcessWindowFunction[GameEvents, IncompleteGameSession, Long, SessionEndAwareTimeWindow] {
// side-output to emit events in session before logged in
lazy val eventsBeforeLoginOutput: OutputTag[GameEvents] = new OutputTag[GameEvents]("events-before-login")
override def process(key: Long, context: Context, elements: Iterable[GameEvents], out: Collector[IncompleteGameSession]): Unit = {
println(elements.mkString("========= ", ",", " ======="))
val eventsBeforeLogin = elements.takeWhile(evt => evt.eventType != GameEventTypes.LOG_IN)
eventsBeforeLogin.foreach(context.output(eventsBeforeLoginOutput, _))
val events = elements.dropWhile(_.eventType != GameEventTypes.LOG_IN)
val openedGameSessions = events
.filter(evt => evt.eventType == GameEventTypes.GAME_START && evt.gameSessionId.isDefined)
.map(_.gameSessionId.get)
.toSet
val closedGameSessions = events
.filter(evt => evt.eventType == GameEventTypes.GAME_END && evt.gameSessionId.isDefined)
.map(_.gameSessionId.get)
.toSet
val openButNotClosedGameSessions = openedGameSessions.diff(closedGameSessions)
openButNotClosedGameSessions.foreach(gameNo => out.collect(IncompleteGameSession(key, gameNo)))
}
}
// It extends TimeWindow to not reimplementing everything, especially EventTimeTrigger.
class SessionEndAwareTimeWindow(val startTime: Long, val endTime: Long, val sessionEnd: Boolean)
extends TimeWindow(startTime, endTime) {
override def maxTimestamp(): Long = {
if (sessionEnd) {
startTime
} else {
endTime - 1
}
}
override def intersects(other: TimeWindow): Boolean = ???
override def cover(other: TimeWindow): TimeWindow = ???
def intersects(other: SessionEndAwareTimeWindow): Boolean = {
// assuming sort order is guaranteed
require(startTime <= other.startTime)
if (other.sessionEnd) {
this.startTime <= other.startTime && this.endTime >= other.startTime
} else {
this.startTime <= other.endTime && this.endTime >= other.startTime
}
}
def cover(other: SessionEndAwareTimeWindow): SessionEndAwareTimeWindow = {
if (other.sessionEnd) {
new SessionEndAwareTimeWindow(Math.min(startTime, other.startTime), -1, true)
} else {
new SessionEndAwareTimeWindow(Math.min(startTime, other.startTime), Math.max(endTime, other.endTime), false)
}
}
def canEqual(other: Any): Boolean = other.isInstanceOf[SessionEndAwareTimeWindow]
override def equals(other: Any): Boolean = other match {
case that: SessionEndAwareTimeWindow =>
super.equals(that) &&
(that canEqual this) &&
startTime == that.startTime &&
endTime == that.endTime &&
sessionEnd == that.sessionEnd
case _ => false
}
override def hashCode(): Int = {
val state = Seq(super.hashCode(), startTime, endTime, sessionEnd)
state.map(_.hashCode()).foldLeft(0)((a, b) => 31 * a + b)
}
override def toString = s"SessionEndAwareTimeWindow($startTime, $endTime, $sessionEnd)"
}
object SessionEndAwareTimeWindow {
@SerialVersionUID(1L)
class Serializer extends TypeSerializerSingleton[SessionEndAwareTimeWindow] {
override def isImmutableType = true
override def createInstance: SessionEndAwareTimeWindow = null
override def copy(from: SessionEndAwareTimeWindow): SessionEndAwareTimeWindow = from
override def copy(from: SessionEndAwareTimeWindow, reuse: SessionEndAwareTimeWindow): SessionEndAwareTimeWindow = from
override def getLength = 0
override def serialize(record: SessionEndAwareTimeWindow, target: DataOutputView): Unit = {
target.writeLong(record.startTime)
target.writeLong(record.endTime)
target.writeBoolean(record.sessionEnd)
}
override def deserialize(source: DataInputView): SessionEndAwareTimeWindow = {
val start = source.readLong
val end = source.readLong
val sessionEnd = source.readBoolean()
new SessionEndAwareTimeWindow(start, end, sessionEnd)
}
override def deserialize(reuse: SessionEndAwareTimeWindow, source: DataInputView): SessionEndAwareTimeWindow = deserialize(source)
override def copy(source: DataInputView, target: DataOutputView): Unit = {
target.writeLong(source.readLong)
target.writeLong(source.readLong)
target.writeBoolean(source.readBoolean())
}
override def canEqual(obj: Any): Boolean = obj.isInstanceOf[SessionEndAwareTimeWindow.Serializer]
}
}
class CustomSessionWindowAssigner extends MergingWindowAssigner[GameEvents, SessionEndAwareTimeWindow] {
override def assignWindows(element: GameEvents, timestamp: Long, context: WindowAssigner.WindowAssignerContext): util.Collection[SessionEndAwareTimeWindow] = {
if (element.eventType == GameEventTypes.LOG_OUT) {
Collections.singletonList(new SessionEndAwareTimeWindow(timestamp, -1, true))
} else {
Collections.singletonList(new SessionEndAwareTimeWindow(timestamp, timestamp + (5 * 60 * 1000), false))
}
}
override def getDefaultTrigger(env: environment.StreamExecutionEnvironment): Trigger[GameEvents, SessionEndAwareTimeWindow] = {
EventTimeTrigger.create().asInstanceOf[Trigger[GameEvents, SessionEndAwareTimeWindow]]
}
override def getWindowSerializer(executionConfig: ExecutionConfig): TypeSerializer[SessionEndAwareTimeWindow] = {
new SessionEndAwareTimeWindow.Serializer
}
override def mergeWindows(windows: util.Collection[SessionEndAwareTimeWindow], callback: MergingWindowAssigner.MergeCallback[SessionEndAwareTimeWindow]): Unit = {
import scala.collection.JavaConverters._
// sort the windows by the start time and then merge overlapping windows
val sortedWindows = windows.asScala.toList.sortBy(_.startTime)
println(sortedWindows.mkString("Windows to merge: ========= ", ",", " ======="))
val merged = new mutable.ArrayBuffer[(SessionEndAwareTimeWindow, Seq[SessionEndAwareTimeWindow])]
var currentMerge: (SessionEndAwareTimeWindow, Seq[SessionEndAwareTimeWindow]) = null
sortedWindows.foreach { candidate =>
if (currentMerge == null) {
currentMerge = (candidate, Seq(candidate))
} else if (currentMerge._1.intersects(candidate)) {
currentMerge = (currentMerge._1.cover(candidate), currentMerge._2 :+ candidate)
} else {
merged.append(currentMerge)
currentMerge = (candidate, Seq(candidate))
}
}
if (currentMerge != null) {
merged.append(currentMerge)
}
merged.filter(_._2.size > 1).foreach { toMerge =>
/*
Wrapper for Scala collection (even mutable) to Java collection doesn't work here,
as Flink calls Iterator.remove which seems to not supported.
java.lang.UnsupportedOperationException
at scala.collection.convert.Wrappers$IteratorWrapper.remove(Wrappers.scala:35)
at scala.collection.convert.Wrappers$IteratorWrapper.remove(Wrappers.scala:30)
at java.util.AbstractCollection.remove(AbstractCollection.java:293)
at org.apache.flink.streaming.runtime.operators.windowing.MergingWindowSet.addWindow(MergingWindowSet.java:185)
at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:311)
at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202)
at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)
at java.lang.Thread.run(Thread.java:748)
*/
callback.merge(new util.ArrayList[SessionEndAwareTimeWindow](toMerge._2.asJava), toMerge._1)
}
}
override def isEventTime: Boolean = true
}
class GameEventSource extends RichSourceFunction[GameEvents] {
override def run(srcCtx: SourceContext[GameEvents]): Unit = {
// initialize random number generator
val rand = new Random()
val startTime = Calendar.getInstance.getTimeInMillis
var curTime = startTime
val userId = rand.nextLong()
// login
srcCtx.collect(GameEvents(userId, GameEventTypes.LOG_IN, None, curTime))
curTime += 500
val gameSessionId = rand.nextLong()
// start game
srcCtx.collect(GameEvents(userId, GameEventTypes.GAME_START, Some(gameSessionId), curTime))
curTime += 2000
// end game
srcCtx.collect(GameEvents(userId, GameEventTypes.GAME_END, Some(gameSessionId), curTime))
curTime += 1000
val gameSessionId2 = rand.nextLong()
srcCtx.collect(GameEvents(userId, GameEventTypes.GAME_START, Some(gameSessionId2), curTime))
curTime += 2000
// log out
srcCtx.collect(GameEvents(userId, GameEventTypes.LOG_OUT, None, curTime))
curTime += 500
val gameSessionId3 = rand.nextLong()
// game start event after logged out (invalid)
srcCtx.collect(GameEvents(userId, GameEventTypes.GAME_START, Some(gameSessionId3), curTime))
curTime += 1500
// game end event after logged out (invalid)
srcCtx.collect(GameEvents(userId, GameEventTypes.GAME_END, Some(gameSessionId3), curTime))
curTime += 500
// new login
srcCtx.collect(GameEvents(userId, GameEventTypes.LOG_IN, None, curTime))
curTime += 1000
val gameSessionId4 = rand.nextLong()
srcCtx.collect(GameEvents(userId, GameEventTypes.GAME_START, Some(gameSessionId4), curTime))
curTime += 500
srcCtx.collect(GameEvents(userId, GameEventTypes.GAME_END, Some(gameSessionId4), curTime))
val gameSessionId5 = rand.nextLong()
curTime += 2000
srcCtx.collect(GameEvents(userId, GameEventTypes.GAME_START, Some(gameSessionId5), curTime))
// out of session
curTime += (6 * 60 * 1000)
// game end signaled after session is closed
srcCtx.collect(GameEvents(userId, GameEventTypes.GAME_END, Some(gameSessionId5), curTime))
// force emit watermark to see result immediately
srcCtx.emitWatermark(new Watermark(curTime + (6 * 60 * 1000)))
Thread.sleep(3000)
// Flink will finish the application after run() method returns
}
override def cancel(): Unit = {}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment