koushikmln / KafkaMaprBuild.sbt
Created May 25, 2018 05:27
Dependencies for Streaming Pipelines Demo on MapR Cluster
name := "KafkaWorkshopMapr"
version := "0.1"
scalaVersion := "2.11.11"
libraryDependencies += "org.apache.spark" % "spark-core_2.11" % "2.2.1"
libraryDependencies += "org.apache.spark" % "spark-sql_2.11" % "2.2.1"
libraryDependencies += "org.apache.spark" % "spark-streaming_2.11" % "2.2.1"
koushikmln / logstash.config
Last active May 26, 2018 09:23
Logstash Config File for Parsing HTTP Logs
input {
file {
path => ["/opt/gen_logs/logs/access.log"]
type => "apache_access"
filter {
grok {
match => [
"message" , "%{COMBINEDAPACHELOG}+%{GREEDYDATA:extra_fields}",
/opt/mapr/spark/spark-2.2.1/bin/spark-submit \
--class CountryVisitCount \
--master yarn \
--conf spark.ui.port=4926 \
--jars $(echo /external_jars/*.jar | tr ' ' ',') \
kafkaworkshopmapr_2.11-0.1.jar prod
val conf = ConfigFactory.load
val envProps: Config = conf.getConfig(args(0))
val sparkConf = new SparkConf().setMaster("yarn").setAppName("SiteTraffic")
val streamingContext = new StreamingContext(sparkConf, Seconds(envProps.getInt("window")))
val broadcastConfig = streamingContext.sparkContext.broadcast(envProps)
val topicsSet = Set("retail_logs")
val now = Calendar.getInstance().getTime()
val timestamp = streamingContext.sparkContext.broadcast(now)
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> envProps.getString("bootstrap.server"),
def getHbaseConnection(config: Config): Connection ={
//Create Hbase Configuration Object
val hBaseConf: Configuration = HBaseConfiguration.create()
hBaseConf.set("hbase.zookeeper.quorum", config.getString("zookeeper.quorum"))
hBaseConf.set("", config.getString("zookeeper.port"))
//Establish Connection
val connection = ConnectionFactory.createConnection(hBaseConf)
dev.zookeeper = localhost:2181
dev.bootstrap.server = localhost:9092
dev.zookeeper.quorum = localhost
dev.zookeeper.port = 2181
dev.window = 20
prod.zookeeper =,,
prod.zookeeper.quorum =,,
prod.zookeeper.port = 5181
prod.bootstrap.server =,,
koushikmln /
Last active July 6, 2018 05:28
Lambda Functions Examples
fun = lambda x : x*x
fun(2) # Will return 2*2 i.e. 4
def sumOfInt(n,fun):
sum = 0
for i in range(0,n+1):
sum = sum + fun(i)
return sum
sumOfInt(5,lambda x: x*x) # Will return sum of squares till 5 i.e. 55
koushikmln /
Last active July 6, 2018 05:42
Python Map Reduce Filter Examples
#MAP Function, takes input as function and list. applies function on list and returns the value as a map object
list1 = [1,2,3,4]
list(map(lambda x:x*x,list1)) # [1, 4, 9, 16]
#reduce takes 2 items at once as input from sequence. it makes an inverted tree
from functools import reduce
list2 = [1,2,3,4,5,6,7,8,9,10]
reduce(lambda x,y: x+y, list2) # 55
list2 = [1,2,3,4,5,6,7,8,9,10]
koushikmln /
Last active July 8, 2018 03:52
Process Order Items CSV to get Order Id, Sub-Total Tuples, Total Amount by Order Id and Revenue Per Order Collection
#Problem Statement 1
#Get (order_id, sub_total) tuple from order items csv using map function.
def getOrderItemTuples(order_items):
return list(map(lambda x: (int(x.split(",")[1]),float(x.split(",")[4])),order_items))
order_items = open("/data/retail_db/order_items/part-00000","r").read().splitlines()
#Problem Statement 2
#Get the total amount for a particular order using map, reduce, filter.
koushikmln /
Created July 6, 2018 12:48
Spark Example to Give Count of Orders By Status
rdd = sc.textFile("/public/retail_db/orders/part-00000")
status_count = x: (x.split(",")[3], 1))
.reduceByKey(lambda a,b: a + b).collect()