Skip to content

Instantly share code, notes, and snippets.

@HeartSaVioR
Created July 26, 2019 13:50
Show Gist options
  • Star 2 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save HeartSaVioR/133c3bdc163f1fd5332397c5cd4b8b29 to your computer and use it in GitHub Desktop.
Save HeartSaVioR/133c3bdc163f1fd5332397c5cd4b8b29 to your computer and use it in GitHub Desktop.
[Spark Structured Streaming] The example of custom type of session window (allow logout event to close session, along with allowing inactivity)
package net.heartsavior.spark
import java.sql.Timestamp
import java.util.Calendar
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.execution.streaming.MemoryStream
import org.apache.spark.sql.streaming.{GroupState, GroupStateTimeout, OutputMode, Trigger}
import scala.util.Random
object CustomSessionWindowExample {
def main(args: Array[String]): Unit = {
val spark = SparkSession
.builder()
.appName("Sample")
.master("local[*]")
.getOrCreate()
implicit val context = spark.sqlContext
import spark.implicits._
val inputData = MemoryStream[(Long, String, Long, Long)]
val events = inputData.toDF()
.selectExpr("_1", "_2", "_3", "CAST(_4 / 1000 AS TIMESTAMP)")
.as[(Long, String, Long, Timestamp)]
.map(r => GameEvents(r._1, r._2, r._3, r._4))
.withWatermark("timestamp", "5 seconds")
val outputMode = OutputMode.Append()
val sessionGapMills = 5 * 60 * 1000
val stateFunc: (Long, Iterator[GameEvents], GroupState[List[SessionWindow]])
=> Iterator[IncompleteGameSession] =
(userId: Long, events: Iterator[GameEvents], state: GroupState[List[SessionWindow]]) => {
def findIncompleteGameSessions(elements: List[GameEvents]): List[IncompleteGameSession] = {
println(elements.mkString("========= ", ",", " ======="))
// we can extract the events but side-output is not supported, so...
val eventsBeforeLogin = elements.takeWhile(evt => evt.eventType != GameEventTypes.LOG_IN)
val events = elements.dropWhile(_.eventType != GameEventTypes.LOG_IN)
val openedGameSessions = events
.filter(evt => evt.eventType == GameEventTypes.GAME_START && evt.gameSessionId >= 0)
.map(_.gameSessionId)
.toSet
val closedGameSessions = events
.filter(evt => evt.eventType == GameEventTypes.GAME_END && evt.gameSessionId >= 0)
.map(_.gameSessionId)
.toSet
val openButNotClosedGameSessions = openedGameSessions.diff(closedGameSessions)
openButNotClosedGameSessions.map(gameNo => IncompleteGameSession(userId, gameNo)).toList
}
def handleEvict(userId: Long, state: GroupState[List[SessionWindow]]): Iterator[IncompleteGameSession] = {
state.getOption match {
case Some(lst) =>
// assuming sessions are sorted by session start timestamp
val (evicted, kept) = lst.span {
s => s.maxTimestamp < state.getCurrentWatermarkMs()
}
if (kept.isEmpty) {
state.remove()
} else {
state.update(kept)
state.setTimeoutTimestamp(kept.head.maxTimestamp)
}
outputMode match {
case s if s == OutputMode.Append() =>
evicted.iterator.flatMap(ev => findIncompleteGameSessions(ev.events))
case s => throw new UnsupportedOperationException(s"Not supported output mode $s")
}
case None =>
state.remove()
Seq.empty[IncompleteGameSession].iterator
}
}
def intersectingSessions(session1: SessionWindow, session2: SessionWindow): Boolean = {
require(session1.startTime <= session2.startTime)
if (session2.sessionEnd) {
session1.startTime <= session2.startTime && session1.endTime >= session2.startTime
} else {
session1.startTime <= session2.endTime && session1.endTime >= session2.startTime
}
}
def assignSession(event: GameEvents): SessionWindow = {
if (event.eventType == GameEventTypes.LOG_OUT) {
SessionWindow(event.timestamp.getTime, -1, List(event), true)
} else {
SessionWindow(event.timestamp.getTime, event.timestamp.getTime + sessionGapMills, List(event), false)
}
}
def mergeSession(session1: SessionWindow, session2: SessionWindow): SessionWindow = {
SessionWindow(
startTime = Math.min(session1.startTime, session2.startTime),
endTime = Math.max(session1.endTime, session2.endTime),
events = session1.events ++ session2.events,
sessionEnd = session2.sessionEnd)
}
def handleEvents(userId: Long, events: Iterator[GameEvents],
state: GroupState[List[SessionWindow]]): Iterator[IncompleteGameSession] = {
import scala.collection.mutable
val convertedSessions = events.map(assignSession)
val oldSessions = state.getOption.getOrElse(List())
val sortedSessions = (convertedSessions.toList ++ oldSessions).sortBy(_.startTime)
println(sortedSessions.mkString("Sessions to merge: ========= ", ",", " ======="))
val newSessions = new mutable.ArrayBuffer[SessionWindow]
var currentSession: SessionWindow = null
sortedSessions.foreach { session =>
if (currentSession == null) {
currentSession = session
} else if (intersectingSessions(currentSession, session)) {
currentSession = mergeSession(currentSession, session)
} else {
newSessions.append(currentSession)
currentSession = session
}
}
if (currentSession != null) {
newSessions.append(currentSession)
}
state.update(newSessions.toList)
// there must be at least one session available
// set timeout to earliest sessions' session end: we will traverse and evict sessions
state.setTimeoutTimestamp(newSessions.head.maxTimestamp)
outputMode match {
case s if s == OutputMode.Append() => Seq.empty[IncompleteGameSession].iterator
case s => throw new UnsupportedOperationException(s"Not supported output mode $s")
}
}
if (state.hasTimedOut) {
handleEvict(userId, state)
} else {
handleEvents(userId, events, state)
}
}
val incompleteGameSessions = events
.groupByKey(_.userId)
.flatMapGroupsWithState[List[SessionWindow], IncompleteGameSession](
outputMode, timeoutConf = GroupStateTimeout.EventTimeTimeout())(stateFunc)
val query = incompleteGameSessions
.writeStream
.format("console")
.outputMode(OutputMode.Append())
.trigger(Trigger.ProcessingTime(1000))
.queryName("find-incomplete-game-sessions-from-game-events")
.start()
ingestData(inputData)
query.awaitTermination()
}
def ingestData(inputData: MemoryStream[(Long, String, Long, Long)]) = {
// initialize random number generator
val rand = new Random()
val startTime = Calendar.getInstance.getTimeInMillis
var curTime = startTime
val userId = rand.nextLong()
// login
inputData.addData((userId, GameEventTypes.LOG_IN, -1L, curTime))
curTime += 500
val gameSessionId = rand.nextLong()
// start game
inputData.addData((userId, GameEventTypes.GAME_START, gameSessionId, curTime))
curTime += 2000
// end game
inputData.addData((userId, GameEventTypes.GAME_END, gameSessionId, curTime))
curTime += 1000
val gameSessionId2 = rand.nextLong()
// start game
inputData.addData((userId, GameEventTypes.GAME_START, gameSessionId2, curTime))
curTime += 2000
// log out
inputData.addData((userId, GameEventTypes.LOG_OUT, -1L, curTime))
curTime += 500
val gameSessionId3 = rand.nextLong()
// game start event after logged out (invalid)
inputData.addData((userId, GameEventTypes.GAME_START, gameSessionId3, curTime))
curTime += 1500
// game end event after logged out (invalid)
inputData.addData((userId, GameEventTypes.GAME_END, gameSessionId3, curTime))
curTime += 500
// new login
inputData.addData((userId, GameEventTypes.LOG_IN, -1L, curTime))
curTime += 1000
val gameSessionId4 = rand.nextLong()
inputData.addData((userId, GameEventTypes.GAME_START, gameSessionId4, curTime))
curTime += 500
inputData.addData((userId, GameEventTypes.GAME_END, gameSessionId4, curTime))
val gameSessionId5 = rand.nextLong()
curTime += 2000
inputData.addData((userId, GameEventTypes.GAME_START, gameSessionId5, curTime))
// out of session
curTime += (6 * 60 * 1000)
// game end signaled after session is closed
inputData.addData((userId, GameEventTypes.GAME_END, gameSessionId5, curTime))
// add artificial event for other user to force advancing watermark
inputData.addData((userId - 1, GameEventTypes.LOG_OUT, -1L, curTime + (6 * 60 * 1000)))
}
}
case class GameEvents(userId: Long, eventType: String, gameSessionId: Long, timestamp: Timestamp)
case class SessionWindow(startTime: Long, endTime: Long, events: List[GameEvents], sessionEnd: Boolean) {
def maxTimestamp: Long = {
if (sessionEnd) {
startTime
} else {
endTime - 1
}
}
}
case class IncompleteGameSession(userId: Long, gameSessionId: Long)
object GameEventTypes {
val LOG_IN = "login"
val GAME_START = "game_start"
val GAME_END = "game_end"
val LOG_OUT = "logout"
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment