Created
July 26, 2019 13:50
-
-
Save HeartSaVioR/133c3bdc163f1fd5332397c5cd4b8b29 to your computer and use it in GitHub Desktop.
[Spark Structured Streaming] The example of custom type of session window (allow logout event to close session, along with allowing inactivity)
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package net.heartsavior.spark | |
import java.sql.Timestamp | |
import java.util.Calendar | |
import org.apache.spark.sql.SparkSession | |
import org.apache.spark.sql.execution.streaming.MemoryStream | |
import org.apache.spark.sql.streaming.{GroupState, GroupStateTimeout, OutputMode, Trigger} | |
import scala.util.Random | |
object CustomSessionWindowExample { | |
def main(args: Array[String]): Unit = { | |
val spark = SparkSession | |
.builder() | |
.appName("Sample") | |
.master("local[*]") | |
.getOrCreate() | |
implicit val context = spark.sqlContext | |
import spark.implicits._ | |
val inputData = MemoryStream[(Long, String, Long, Long)] | |
val events = inputData.toDF() | |
.selectExpr("_1", "_2", "_3", "CAST(_4 / 1000 AS TIMESTAMP)") | |
.as[(Long, String, Long, Timestamp)] | |
.map(r => GameEvents(r._1, r._2, r._3, r._4)) | |
.withWatermark("timestamp", "5 seconds") | |
val outputMode = OutputMode.Append() | |
val sessionGapMills = 5 * 60 * 1000 | |
val stateFunc: (Long, Iterator[GameEvents], GroupState[List[SessionWindow]]) | |
=> Iterator[IncompleteGameSession] = | |
(userId: Long, events: Iterator[GameEvents], state: GroupState[List[SessionWindow]]) => { | |
def findIncompleteGameSessions(elements: List[GameEvents]): List[IncompleteGameSession] = { | |
println(elements.mkString("========= ", ",", " =======")) | |
// we can extract the events but side-output is not supported, so... | |
val eventsBeforeLogin = elements.takeWhile(evt => evt.eventType != GameEventTypes.LOG_IN) | |
val events = elements.dropWhile(_.eventType != GameEventTypes.LOG_IN) | |
val openedGameSessions = events | |
.filter(evt => evt.eventType == GameEventTypes.GAME_START && evt.gameSessionId >= 0) | |
.map(_.gameSessionId) | |
.toSet | |
val closedGameSessions = events | |
.filter(evt => evt.eventType == GameEventTypes.GAME_END && evt.gameSessionId >= 0) | |
.map(_.gameSessionId) | |
.toSet | |
val openButNotClosedGameSessions = openedGameSessions.diff(closedGameSessions) | |
openButNotClosedGameSessions.map(gameNo => IncompleteGameSession(userId, gameNo)).toList | |
} | |
def handleEvict(userId: Long, state: GroupState[List[SessionWindow]]): Iterator[IncompleteGameSession] = { | |
state.getOption match { | |
case Some(lst) => | |
// assuming sessions are sorted by session start timestamp | |
val (evicted, kept) = lst.span { | |
s => s.maxTimestamp < state.getCurrentWatermarkMs() | |
} | |
if (kept.isEmpty) { | |
state.remove() | |
} else { | |
state.update(kept) | |
state.setTimeoutTimestamp(kept.head.maxTimestamp) | |
} | |
outputMode match { | |
case s if s == OutputMode.Append() => | |
evicted.iterator.flatMap(ev => findIncompleteGameSessions(ev.events)) | |
case s => throw new UnsupportedOperationException(s"Not supported output mode $s") | |
} | |
case None => | |
state.remove() | |
Seq.empty[IncompleteGameSession].iterator | |
} | |
} | |
def intersectingSessions(session1: SessionWindow, session2: SessionWindow): Boolean = { | |
require(session1.startTime <= session2.startTime) | |
if (session2.sessionEnd) { | |
session1.startTime <= session2.startTime && session1.endTime >= session2.startTime | |
} else { | |
session1.startTime <= session2.endTime && session1.endTime >= session2.startTime | |
} | |
} | |
def assignSession(event: GameEvents): SessionWindow = { | |
if (event.eventType == GameEventTypes.LOG_OUT) { | |
SessionWindow(event.timestamp.getTime, -1, List(event), true) | |
} else { | |
SessionWindow(event.timestamp.getTime, event.timestamp.getTime + sessionGapMills, List(event), false) | |
} | |
} | |
def mergeSession(session1: SessionWindow, session2: SessionWindow): SessionWindow = { | |
SessionWindow( | |
startTime = Math.min(session1.startTime, session2.startTime), | |
endTime = Math.max(session1.endTime, session2.endTime), | |
events = session1.events ++ session2.events, | |
sessionEnd = session2.sessionEnd) | |
} | |
def handleEvents(userId: Long, events: Iterator[GameEvents], | |
state: GroupState[List[SessionWindow]]): Iterator[IncompleteGameSession] = { | |
import scala.collection.mutable | |
val convertedSessions = events.map(assignSession) | |
val oldSessions = state.getOption.getOrElse(List()) | |
val sortedSessions = (convertedSessions.toList ++ oldSessions).sortBy(_.startTime) | |
println(sortedSessions.mkString("Sessions to merge: ========= ", ",", " =======")) | |
val newSessions = new mutable.ArrayBuffer[SessionWindow] | |
var currentSession: SessionWindow = null | |
sortedSessions.foreach { session => | |
if (currentSession == null) { | |
currentSession = session | |
} else if (intersectingSessions(currentSession, session)) { | |
currentSession = mergeSession(currentSession, session) | |
} else { | |
newSessions.append(currentSession) | |
currentSession = session | |
} | |
} | |
if (currentSession != null) { | |
newSessions.append(currentSession) | |
} | |
state.update(newSessions.toList) | |
// there must be at least one session available | |
// set timeout to earliest sessions' session end: we will traverse and evict sessions | |
state.setTimeoutTimestamp(newSessions.head.maxTimestamp) | |
outputMode match { | |
case s if s == OutputMode.Append() => Seq.empty[IncompleteGameSession].iterator | |
case s => throw new UnsupportedOperationException(s"Not supported output mode $s") | |
} | |
} | |
if (state.hasTimedOut) { | |
handleEvict(userId, state) | |
} else { | |
handleEvents(userId, events, state) | |
} | |
} | |
val incompleteGameSessions = events | |
.groupByKey(_.userId) | |
.flatMapGroupsWithState[List[SessionWindow], IncompleteGameSession]( | |
outputMode, timeoutConf = GroupStateTimeout.EventTimeTimeout())(stateFunc) | |
val query = incompleteGameSessions | |
.writeStream | |
.format("console") | |
.outputMode(OutputMode.Append()) | |
.trigger(Trigger.ProcessingTime(1000)) | |
.queryName("find-incomplete-game-sessions-from-game-events") | |
.start() | |
ingestData(inputData) | |
query.awaitTermination() | |
} | |
def ingestData(inputData: MemoryStream[(Long, String, Long, Long)]) = { | |
// initialize random number generator | |
val rand = new Random() | |
val startTime = Calendar.getInstance.getTimeInMillis | |
var curTime = startTime | |
val userId = rand.nextLong() | |
// login | |
inputData.addData((userId, GameEventTypes.LOG_IN, -1L, curTime)) | |
curTime += 500 | |
val gameSessionId = rand.nextLong() | |
// start game | |
inputData.addData((userId, GameEventTypes.GAME_START, gameSessionId, curTime)) | |
curTime += 2000 | |
// end game | |
inputData.addData((userId, GameEventTypes.GAME_END, gameSessionId, curTime)) | |
curTime += 1000 | |
val gameSessionId2 = rand.nextLong() | |
// start game | |
inputData.addData((userId, GameEventTypes.GAME_START, gameSessionId2, curTime)) | |
curTime += 2000 | |
// log out | |
inputData.addData((userId, GameEventTypes.LOG_OUT, -1L, curTime)) | |
curTime += 500 | |
val gameSessionId3 = rand.nextLong() | |
// game start event after logged out (invalid) | |
inputData.addData((userId, GameEventTypes.GAME_START, gameSessionId3, curTime)) | |
curTime += 1500 | |
// game end event after logged out (invalid) | |
inputData.addData((userId, GameEventTypes.GAME_END, gameSessionId3, curTime)) | |
curTime += 500 | |
// new login | |
inputData.addData((userId, GameEventTypes.LOG_IN, -1L, curTime)) | |
curTime += 1000 | |
val gameSessionId4 = rand.nextLong() | |
inputData.addData((userId, GameEventTypes.GAME_START, gameSessionId4, curTime)) | |
curTime += 500 | |
inputData.addData((userId, GameEventTypes.GAME_END, gameSessionId4, curTime)) | |
val gameSessionId5 = rand.nextLong() | |
curTime += 2000 | |
inputData.addData((userId, GameEventTypes.GAME_START, gameSessionId5, curTime)) | |
// out of session | |
curTime += (6 * 60 * 1000) | |
// game end signaled after session is closed | |
inputData.addData((userId, GameEventTypes.GAME_END, gameSessionId5, curTime)) | |
// add artificial event for other user to force advancing watermark | |
inputData.addData((userId - 1, GameEventTypes.LOG_OUT, -1L, curTime + (6 * 60 * 1000))) | |
} | |
} | |
case class GameEvents(userId: Long, eventType: String, gameSessionId: Long, timestamp: Timestamp) | |
case class SessionWindow(startTime: Long, endTime: Long, events: List[GameEvents], sessionEnd: Boolean) { | |
def maxTimestamp: Long = { | |
if (sessionEnd) { | |
startTime | |
} else { | |
endTime - 1 | |
} | |
} | |
} | |
case class IncompleteGameSession(userId: Long, gameSessionId: Long) | |
object GameEventTypes { | |
val LOG_IN = "login" | |
val GAME_START = "game_start" | |
val GAME_END = "game_end" | |
val LOG_OUT = "logout" | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment