Skip to content

Instantly share code, notes, and snippets.

Avatar

Koushik M.L.N koushikmln

View GitHub Profile
@koushikmln
koushikmln / logstash.repo
Created Jul 16, 2018
Logstash Repository for Cent Os
View logstash.repo
[logstash-6.x]
name=Elastic repository for 6.x packages
baseurl=https://artifacts.elastic.co/packages/6.x/yum
gpgcheck=1
gpgkey=https://artifacts.elastic.co/GPG-KEY-elasticsearch
enabled=1
autorefresh=1
type=rpm-md
@koushikmln
koushikmln / OrderItemsSpark.py
Created Jul 7, 2018
Process Order Items Using Spark to get Order Id, Sub-Total Tuples, Total Amount by Order Id and Revenue Per Order Collection
View OrderItemsSpark.py
# Use map to create an rdd of (order_id, sub_total) tuple.
rdd = sc.textFile("/public/retail_db/order_items/part-00000")
orderItemTuple = rdd.map(lambda x: (int(x.split(",")[1]), float(x.split(",")[4])))
orderItemTuple.take(10)
# Get total for particular order_id
orderItemTuple.filter(lambda x: x[0] == 2).reduce(lambda x, y: (x[0], x[1] + y[1]))
# Get order_id,total tuple
orderItemTuple.reduceByKey(lambda x, y: x + y).take(10)
@koushikmln
koushikmln / Hadoop.sh
Created Jul 6, 2018
Hadoop Fs Commands
View Hadoop.sh
hadoop fs -put /data/retail_db/order_items/part-00000 /user/koushikmln/retail_db_order_items.csv
#Set Blocksize
hadoop fs -D dfs.blocksize=67108864 -put /data/retail_db/order_items/part-00000 /user/koushikmln/retail_db_order_items.csv
#Set Replication Factor and Black Size
hadoop fs -D dfs.blocksize=67108864 -D dfs.replication=1 -put /data/retail_db/order_items/part-00000 /user/koushikmln/retail_db_order_items.csv
#Get File Metadata
hdfs fsck /user/koushikmln/retail_db_order_items.csv
@koushikmln
koushikmln / SparkExample.py
Created Jul 6, 2018
Spark Example to Give Count of Orders By Status
View SparkExample.py
rdd = sc.textFile("/public/retail_db/orders/part-00000")
status_count = rdd.map(lambda x: (x.split(",")[3], 1))
.reduceByKey(lambda a,b: a + b).collect()
@koushikmln
koushikmln / OrderItems.py
Last active Jul 8, 2018
Process Order Items CSV to get Order Id, Sub-Total Tuples, Total Amount by Order Id and Revenue Per Order Collection
View OrderItems.py
#Problem Statement 1
#Get (order_id, sub_total) tuple from order items csv using map function.
def getOrderItemTuples(order_items):
return list(map(lambda x: (int(x.split(",")[1]),float(x.split(",")[4])),order_items))
order_items = open("/data/retail_db/order_items/part-00000","r").read().splitlines()
getOrderItemTuples(order_items[:20])
#Problem Statement 2
#Get the total amount for a particular order using map, reduce, filter.
@koushikmln
koushikmln / examples.py
Last active Jul 6, 2018
Python Map Reduce Filter Examples
View examples.py
#MAP Function, takes input as function and list. applies function on list and returns the value as a map object
list1 = [1,2,3,4]
list(map(lambda x:x*x,list1)) # [1, 4, 9, 16]
#reduce takes 2 items at once as input from sequence. it makes an inverted tree
from functools import reduce
list2 = [1,2,3,4,5,6,7,8,9,10]
reduce(lambda x,y: x+y, list2) # 55
list2 = [1,2,3,4,5,6,7,8,9,10]
@koushikmln
koushikmln / lambda.py
Last active Jul 6, 2018
Lambda Functions Examples
View lambda.py
fun = lambda x : x*x
fun(2) # Will return 2*2 i.e. 4
def sumOfInt(n,fun):
sum = 0
for i in range(0,n+1):
sum = sum + fun(i)
return sum
sumOfInt(5,lambda x: x*x) # Will return sum of squares till 5 i.e. 55
View application.properties
dev.zookeeper = localhost:2181
dev.bootstrap.server = localhost:9092
dev.zookeeper.quorum = localhost
dev.zookeeper.port = 2181
dev.window = 20
prod.zookeeper = mapr02.itversity.com:5181,mapr03.itversity.com:5181,mapr04.itversity.com:5181
prod.zookeeper.quorum = mapr02.itversity.com,mapr03.itversity.com,mapr04.itversity.com
prod.zookeeper.port = 5181
prod.bootstrap.server = mapr02.itversity.com:9092,mapr03.itversity.com:9092,mapr04.itversity.com:9092
View HBaseFunctions.scala
def getHbaseConnection(config: Config): Connection ={
//Create Hbase Configuration Object
val hBaseConf: Configuration = HBaseConfiguration.create()
hBaseConf.set("hbase.zookeeper.quorum", config.getString("zookeeper.quorum"))
hBaseConf.set("hbase.zookeeper.property.clientPort", config.getString("zookeeper.port"))
hBaseConf.set("zookeeper.znode.parent","/hbase-unsecure")
hBaseConf.set("hbase.cluster.distributed","true")
//Establish Connection
val connection = ConnectionFactory.createConnection(hBaseConf)
connection
View KafkaDemoSparkCode.scala
val conf = ConfigFactory.load
val envProps: Config = conf.getConfig(args(0))
val sparkConf = new SparkConf().setMaster("yarn").setAppName("SiteTraffic")
val streamingContext = new StreamingContext(sparkConf, Seconds(envProps.getInt("window")))
val broadcastConfig = streamingContext.sparkContext.broadcast(envProps)
val topicsSet = Set("retail_logs")
val now = Calendar.getInstance().getTime()
val timestamp = streamingContext.sparkContext.broadcast(now)
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> envProps.getString("bootstrap.server"),