Skip to content

Instantly share code, notes, and snippets.

@Jayvardhan-Reddy
Created November 4, 2018 18:29
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save Jayvardhan-Reddy/5ae3dc4388013339afcc289e4b8b2b87 to your computer and use it in GitHub Desktop.
Save Jayvardhan-Reddy/5ae3dc4388013339afcc289e4b8b2b87 to your computer and use it in GitHub Desktop.
Itversiy_Spark_Streaming
# wskafka.conf: A single-node Flume configuration
# to read data from webserver logs and publish
# to kafka topic
# Name the components on this agent
wk.sources = ws
wk.sinks = kafka
wk.channels = mem
# Describe/configure the source
wk.sources.ws.type = exec
wk.sources.ws.command = tail -F /opt/gen_logs/logs/access.log
# Describe the sink
wk.sinks.kafka.type = org.apache.flume.sink.kafka.KafkaSink
wk.sinks.kafka.brokerList = wn01.itversity.com:6667,wn02.itversity.com:6667,wn03.itversity.com:6667,wn04.itversity.com:6667
wk.sinks.kafka.topic = fkdemojv
# Use a channel wkich buffers events in memory
wk.channels.mem.type = memory
wk.channels.mem.capacity = 1000
wk.channels.mem.transactionCapacity = 100
# Bind the source and sink to the channel
wk.sources.ws.channels = mem
wk.sinks.kafka.channel = mem
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{StreamingContext,Seconds}
import org.apache.spark.streaming.kafka._
import kafka.serializer.StringDecoder
object KafkaStreamingDepartmentCount {
def main(args: Array[String])
{
val conf = new SparkConf().setAppName("Kafka Streaming Department Count").setMaster(args(0))
val ssc = new StreamingContext(conf,Seconds(30))
//val kafkaParams = Map[String, String]("metadata.broker.list" -> "wn01.itversity.com:6667,wn02.itversity.com:6667,wn03.itversity.com:6667,wn04.itversity.com:6667")
val kafkaParams = Map[String, String]("metadata.broker.list" -> "nn01.itversity.com:6667,nn02.itversity.com:6667,rm01.itversity.com:6667")
val topicSet = Set("fkdemojv")
// Consume data from kafka topic. i.e create a stream that consumes Kafka topics.
// The messages aas part of Streams are (key,Value) pairs in binary format and we have to convert it to string.
// So we use the kafka serializer. We have to create it in below format.
// val directKafkaStream = KafkaUtils.createDirectStream[ [key class], [value class], [key decoder class],
// [value decoder class] ]( streamingContext, [map of Kafka parameters(kafka broker list or bootstrap.servers)], [set of topics to consume])
val stream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicSet)
//Extract the data. Here the data received as a stream is of type tuple of K,V pair n we need to extract only value.
val messages = stream.map(s => s._2)
val departmentMessages = messages.
filter(msg => {
val endPoint = msg.split(" ")(6)
endPoint.split("/")(1) == "department"
})
val departments = departmentMessages.
map(ele => {
val endPoint = ele.split(" ")(6)
(endPoint.split("/")(2), 1)
})
val departmentTraffic = departments.
reduceByKey((total,value) => total+value)
departmentTraffic.saveAsTextFiles("/user/jvanchir/departmentwisetraffic/cnt")
ssc.start()
ssc.awaitTermination()
}
}
name := "SparkStreamDemo"
version := "1.0"
scalaVersion := "2.10.6"
libraryDependencies += "org.apache.spark" % "spark-core_2.10" % "1.6.2"
libraryDependencies += "org.apache.spark" % "spark-sql_2.10" % "1.6.2"
libraryDependencies += "org.apache.spark" % "spark-hive_2.10" % "1.6.2"
libraryDependencies += "org.apache.spark" % "spark-streaming_2.10" % "1.6.2"
libraryDependencies += "org.apache.spark" % "spark-streaming-flume_2.10" % "1.6.2"
libraryDependencies += "org.apache.spark" % "spark-streaming-flume-sink_2.10" % "1.6.2"
libraryDependencies += "org.apache.spark" % "spark-streaming-kafka_2.10" % "1.6.2"
//Dependencies for build sbt file (Spark 2.0 supported)
name := "SparkStreamDemo"
version := "0.1"
scalaVersion := "2.11.12"
val sparkVersion = "2.3.0"
resolvers ++= Seq(
"apache-snapshots" at "http://repository.apache.org/snapshots/"
)
libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-core" % sparkVersion,
"org.apache.spark" %% "spark-sql" % sparkVersion,
"org.apache.spark" %% "spark-mllib" % sparkVersion,
"org.apache.spark" %% "spark-streaming" % sparkVersion,
"org.apache.spark" %% "spark-hive" % sparkVersion,
"org.apache.spark" % "spark-streaming-flume_2.11" % "2.3.2",
"org.apache.spark" % "spark-streaming-flume-sink_2.11" % "2.3.2",
"org.apache.commons" % "commons-lang3" % "3.2.1",
"org.apache.spark" % "spark-streaming-kafka_2.10" % "1.6.2",
"org.scala-lang" % "scala-library" % "2.11.7",
"mysql" % "mysql-connector-java" % "5.1.6"
)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment