Skip to content

Instantly share code, notes, and snippets.

@wangchenyi
Last active April 10, 2018 18:15
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save wangchenyi/f9922df2589eab3a59b6dc4a4369116f to your computer and use it in GitHub Desktop.
Save wangchenyi/f9922df2589eab3a59b6dc4a4369116f to your computer and use it in GitHub Desktop.
def updatePageviewDatawithState(pvid: String,
events: Iterator[CLogData],
state: GroupState[PageViewInfo2]): (String, Int) = {
if (state.hasTimedOut) {
(pvid, state.get.numEvents)
} else {
val initInfo = PageViewInfo2(None, 0, 0L, 0L)
val pageViewInfo = PageViewInfo2(None, events.size + state.getOption.getOrElse(initInfo).numEvents, 0L, 0L)
state.update(pageViewInfo)
state.setTimeoutDuration("2 minutes")
(pvid, events.size)
}
}
val pageviewUpdates = clogs
.filter(_.isVaild)
.groupByKey(_.pvid.get)
.mapGroupsWithState[PageViewInfo2, (String, Int)](GroupStateTimeout.ProcessingTimeTimeout)(updatePageviewDatawithState)
val query = pageviewUpdates
.writeStream
.format("console")
.trigger(Trigger.ProcessingTime(30 * 1000L))
.outputMode(OutputMode.Update())
.queryName("result")
.start()
query.awaitTermination()
case class PageViewInfo2(data: Option[PageViewData],
numEvents: Int,
startTimestampMs: Long,
endTimestampMs: Long) {
def durationMs: Long = endTimestampMs - startTimestampMs
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment