Skip to content

Instantly share code, notes, and snippets.

@newfront
Last active March 3, 2021 19:29
Show Gist options
  • Save newfront/510458bbd1310140cfb872e5a0185517 to your computer and use it in GitHub Desktop.
Save newfront/510458bbd1310140cfb872e5a0185517 to your computer and use it in GitHub Desktop.
Simple Memory Stream
test("Should aggregate call events") {
implicit val sqlContext: SQLContext = spark.sqlContext
import spark.implicits._
val scenario = TestHelper.loadScenario[CallEvent](s"$pathToTestScenarios/pdd_events.json")
val scenarioIter = scenario.toIterator
scenario.nonEmpty shouldBe true
val kafkaData = MemoryStream[MockKafkaDataFrame]
val processingStream = EventAggregation(appConfig).process(kafkaData.toDF())(session)
.writeStream
.format("memory")
.queryName("calleventaggs")
.outputMode(eventAggregation.outputMode)
.start()
// send 11 events into the streaming application
kafkaData.addData(scenarioIter.take(11).map(TestHelper.asMockKafkaDataFrame))
// force spark to trigger
processingStream.processAllAvailable()
val res = spark.sql("select avg(stats.p99) from calleventaggs")
.collect()
.map(_.getAs[Double](0))
.head
DiscoveryUtils.round(res) shouldEqual 7.56
processingStream.stop()
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment