Created
November 4, 2018 18:29
-
-
Save Jayvardhan-Reddy/5ae3dc4388013339afcc289e4b8b2b87 to your computer and use it in GitHub Desktop.
Itversiy_Spark_Streaming
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# wskafka.conf: A single-node Flume configuration | |
# to read data from webserver logs and publish | |
# to kafka topic | |
# Name the components on this agent | |
wk.sources = ws | |
wk.sinks = kafka | |
wk.channels = mem | |
# Describe/configure the source | |
wk.sources.ws.type = exec | |
wk.sources.ws.command = tail -F /opt/gen_logs/logs/access.log | |
# Describe the sink | |
wk.sinks.kafka.type = org.apache.flume.sink.kafka.KafkaSink | |
wk.sinks.kafka.brokerList = wn01.itversity.com:6667,wn02.itversity.com:6667,wn03.itversity.com:6667,wn04.itversity.com:6667 | |
wk.sinks.kafka.topic = fkdemojv | |
# Use a channel wkich buffers events in memory | |
wk.channels.mem.type = memory | |
wk.channels.mem.capacity = 1000 | |
wk.channels.mem.transactionCapacity = 100 | |
# Bind the source and sink to the channel | |
wk.sources.ws.channels = mem | |
wk.sinks.kafka.channel = mem |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import org.apache.spark.SparkConf | |
import org.apache.spark.streaming.{StreamingContext,Seconds} | |
import org.apache.spark.streaming.kafka._ | |
import kafka.serializer.StringDecoder | |
object KafkaStreamingDepartmentCount { | |
def main(args: Array[String]) | |
{ | |
val conf = new SparkConf().setAppName("Kafka Streaming Department Count").setMaster(args(0)) | |
val ssc = new StreamingContext(conf,Seconds(30)) | |
//val kafkaParams = Map[String, String]("metadata.broker.list" -> "wn01.itversity.com:6667,wn02.itversity.com:6667,wn03.itversity.com:6667,wn04.itversity.com:6667") | |
val kafkaParams = Map[String, String]("metadata.broker.list" -> "nn01.itversity.com:6667,nn02.itversity.com:6667,rm01.itversity.com:6667") | |
val topicSet = Set("fkdemojv") | |
// Consume data from kafka topic. i.e create a stream that consumes Kafka topics. | |
// The messages aas part of Streams are (key,Value) pairs in binary format and we have to convert it to string. | |
// So we use the kafka serializer. We have to create it in below format. | |
// val directKafkaStream = KafkaUtils.createDirectStream[ [key class], [value class], [key decoder class], | |
// [value decoder class] ]( streamingContext, [map of Kafka parameters(kafka broker list or bootstrap.servers)], [set of topics to consume]) | |
val stream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicSet) | |
//Extract the data. Here the data received as a stream is of type tuple of K,V pair n we need to extract only value. | |
val messages = stream.map(s => s._2) | |
val departmentMessages = messages. | |
filter(msg => { | |
val endPoint = msg.split(" ")(6) | |
endPoint.split("/")(1) == "department" | |
}) | |
val departments = departmentMessages. | |
map(ele => { | |
val endPoint = ele.split(" ")(6) | |
(endPoint.split("/")(2), 1) | |
}) | |
val departmentTraffic = departments. | |
reduceByKey((total,value) => total+value) | |
departmentTraffic.saveAsTextFiles("/user/jvanchir/departmentwisetraffic/cnt") | |
ssc.start() | |
ssc.awaitTermination() | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
name := "SparkStreamDemo" | |
version := "1.0" | |
scalaVersion := "2.10.6" | |
libraryDependencies += "org.apache.spark" % "spark-core_2.10" % "1.6.2" | |
libraryDependencies += "org.apache.spark" % "spark-sql_2.10" % "1.6.2" | |
libraryDependencies += "org.apache.spark" % "spark-hive_2.10" % "1.6.2" | |
libraryDependencies += "org.apache.spark" % "spark-streaming_2.10" % "1.6.2" | |
libraryDependencies += "org.apache.spark" % "spark-streaming-flume_2.10" % "1.6.2" | |
libraryDependencies += "org.apache.spark" % "spark-streaming-flume-sink_2.10" % "1.6.2" | |
libraryDependencies += "org.apache.spark" % "spark-streaming-kafka_2.10" % "1.6.2" |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
//Dependencies for build sbt file (Spark 2.0 supported) | |
name := "SparkStreamDemo" | |
version := "0.1" | |
scalaVersion := "2.11.12" | |
val sparkVersion = "2.3.0" | |
resolvers ++= Seq( | |
"apache-snapshots" at "http://repository.apache.org/snapshots/" | |
) | |
libraryDependencies ++= Seq( | |
"org.apache.spark" %% "spark-core" % sparkVersion, | |
"org.apache.spark" %% "spark-sql" % sparkVersion, | |
"org.apache.spark" %% "spark-mllib" % sparkVersion, | |
"org.apache.spark" %% "spark-streaming" % sparkVersion, | |
"org.apache.spark" %% "spark-hive" % sparkVersion, | |
"org.apache.spark" % "spark-streaming-flume_2.11" % "2.3.2", | |
"org.apache.spark" % "spark-streaming-flume-sink_2.11" % "2.3.2", | |
"org.apache.commons" % "commons-lang3" % "3.2.1", | |
"org.apache.spark" % "spark-streaming-kafka_2.10" % "1.6.2", | |
"org.scala-lang" % "scala-library" % "2.11.7", | |
"mysql" % "mysql-connector-java" % "5.1.6" | |
) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment