Skip to content

Instantly share code, notes, and snippets.

@newfront
Created March 14, 2019 05:38
Show Gist options
  • Save newfront/f514449467096c849550d26704e79e7d to your computer and use it in GitHub Desktop.
Save newfront/f514449467096c849550d26704e79e7d to your computer and use it in GitHub Desktop.
MemoryStream Example
class EventMemoryStreamSpec extends FunSuite with Matchers with SparkSqlTest {
val log: Logger = LoggerFactory.getLogger(classOf[EventAggregation])
private val pathToTestScenarios = "src/test/resources/scenarios"
lazy val session: SparkSession = sparkSql
override def conf: SparkConf = {
new SparkConf()
.setMaster("local[*]")
.setAppName("aggregation-test-app")
.set("spark.ui.enabled", "false")
.set("spark.app.id", appID)
.set("spark.driver.host", "localhost")
.set("spark.sql.shuffle.partitions", "32")
.set("spark.executor.cores", "4")
.set("spark.executor.memory", "1g")
.set("spark.ui.enabled", "false")
.setJars(SparkContext.jarOfClass(classOf[EventAggregation]).toList)
}
protected val checkpointDir: String = Files.createTempDirectory(appID).toString
def appConfigForTest(): AppConfiguration = {
val baseConfig = AppConfig("src/test/resources/app.yaml")
baseConfig.copy(
checkpointPath = checkpointDir
)
baseConfig
}
test("Should aggregate call events") {
implicit val sqlContext: SQLContext = session.sqlContext
import session.implicits._
val appConfig = appConfigForTest()
val scenario = TestHelper.loadScenario[CallEvent](s"$pathToTestScenarios/pdd_events.json")
val scenarioIter = scenario.toIterator
scenario.nonEmpty shouldBe true
val trendDiscoveryApp = new TrendDiscoveryApp(appConfigForTest(), session)
val kafkaData = MemoryStream[MockKafkaDataFrame]
val processingTimeTrigger = Trigger.ProcessingTime(2.seconds)
val eventAggregation = EventAggregation(appConfig)
val processingStream = eventAggregation.process(kafkaData.toDF())(session)
.writeStream
.format("memory")
.queryName("calleventaggs")
.outputMode(eventAggregation.outputMode)
.trigger(processingTimeTrigger)
.start()
// 22 events
kafkaData.addData(scenarioIter.take(11).map(TestHelper.asMockKafkaDataFrame))
processingStream.processAllAvailable()
kafkaData.addData(scenarioIter.take(10).map(TestHelper.asMockKafkaDataFrame))
processingStream.processAllAvailable()
kafkaData.addData(scenarioIter.take(1).map(TestHelper.asMockKafkaDataFrame))
processingStream.processAllAvailable()
val df = session.sql("select * from calleventaggs")
val res = session
.sql("select avg(stats.p99) from calleventaggs")
.collect().map { r => r.getAs[Double](0) }.head
DiscoveryUtils.round(res) shouldEqual 7.56
processingStream.stop()
}
}
@rvramanan0
Copy link

Hi Scott,

Shall I get this complete Test scripts?

thanks and Regards
Venkat

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment