private def monitorRecordsProcessed(pointsRecordCounter: LongAccumulator): Unit = { val startTime = System.currentTimeMillis() val counter = pointsRecordCounter log.info("Records monitoring started") while (true) { val timeInSec = TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis() - startTime) val recordsCount = counter.sum val tp = recordsCount.toFloat / timeInSec log.info(s"Records processed ${recordsCount} in ${timeInSec} sec , throughput ${tp} / sec") Thread.sleep(TimeUnit.SECONDS.toMillis(1)) } } private def processData(path: String, pointsRecordCounter: LongAccumulator, sparkSession: SparkSession) = { val topXValues = sparkSession.sparkContext.textFile(path) .map(line => line.split(",")) .map(row => { pointsRecordCounter.add(1) (row(0), 1) }) .reduceByKey((x, y) => x + y) .sortBy(_._2, false) .take(10) topXValues }