package com.spnotes.spark import com.typesafe.scalalogging.Logger import org.apache.spark.SparkConf import org.apache.spark.streaming.flume.FlumeUtils import org.apache.spark.streaming.{Seconds, StreamingContext} import org.slf4j.LoggerFactory object FlumePollingStreamClient { val logger = Logger(LoggerFactory.getLogger("FlumePollingStreamClient")) def main(argv:Array[String]): Unit ={ logger.debug("Entering FlumePollingStreamClient.main") if(argv.length != 3){ println("Please provide 3 parameters <host> <port> <microbatchtime>") System.exit(1) } val hostName =argv(0) val port = argv(1).toInt val microBatchTime = argv(2).toInt logger.debug(s"Listening on $hostName at $port batching records every $microBatchTime") val sparkConf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount") val sparkStreamingContext = new StreamingContext(sparkConf,Seconds(microBatchTime)) val stream = FlumeUtils.createPollingStream(sparkStreamingContext,hostName,port) // val lines = FlumeUtils.createStream(sparkStreamingContext,hostName,port) val mappedlines = stream.map{ sparkFlumeEvent => val event = sparkFlumeEvent.event println("Value of event " + event) println("Value of event Header " + event.getHeaders) println("Value of event Schema " + event.getSchema) val messageBody = new String(event.getBody.array()) println("Value of event Body " + messageBody) messageBody }.print() stream.count().map(cnt => "Received " + cnt + " flume events." ).print() sparkStreamingContext.start() sparkStreamingContext.awaitTermination() logger.debug("Exiting FlumePollingStreamClient.main") } }