This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
hadoop fs -put /data/retail_db/order_items/part-00000 /user/koushikmln/retail_db_order_items.csv | |
#Set Blocksize | |
hadoop fs -D dfs.blocksize=67108864 -put /data/retail_db/order_items/part-00000 /user/koushikmln/retail_db_order_items.csv | |
#Set Replication Factor and Black Size | |
hadoop fs -D dfs.blocksize=67108864 -D dfs.replication=1 -put /data/retail_db/order_items/part-00000 /user/koushikmln/retail_db_order_items.csv | |
#Get File Metadata | |
hdfs fsck /user/koushikmln/retail_db_order_items.csv |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
[logstash-6.x] | |
name=Elastic repository for 6.x packages | |
baseurl=https://artifacts.elastic.co/packages/6.x/yum | |
gpgcheck=1 | |
gpgkey=https://artifacts.elastic.co/GPG-KEY-elasticsearch | |
enabled=1 | |
autorefresh=1 | |
type=rpm-md |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#Problem Statement 1 | |
#Get (order_id, sub_total) tuple from order items csv using map function. | |
def getOrderItemTuples(order_items): | |
return list(map(lambda x: (int(x.split(",")[1]),float(x.split(",")[4])),order_items)) | |
order_items = open("/data/retail_db/order_items/part-00000","r").read().splitlines() | |
getOrderItemTuples(order_items[:20]) | |
#Problem Statement 2 | |
#Get the total amount for a particular order using map, reduce, filter. |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Use map to create an rdd of (order_id, sub_total) tuple. | |
rdd = sc.textFile("/public/retail_db/order_items/part-00000") | |
orderItemTuple = rdd.map(lambda x: (int(x.split(",")[1]), float(x.split(",")[4]))) | |
orderItemTuple.take(10) | |
# Get total for particular order_id | |
orderItemTuple.filter(lambda x: x[0] == 2).reduce(lambda x, y: (x[0], x[1] + y[1])) | |
# Get order_id,total tuple | |
orderItemTuple.reduceByKey(lambda x, y: x + y).take(10) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
rdd = sc.textFile("/public/retail_db/orders/part-00000") | |
status_count = rdd.map(lambda x: (x.split(",")[3], 1)) | |
.reduceByKey(lambda a,b: a + b).collect() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#MAP Function, takes input as function and list. applies function on list and returns the value as a map object | |
list1 = [1,2,3,4] | |
list(map(lambda x:x*x,list1)) # [1, 4, 9, 16] | |
#reduce takes 2 items at once as input from sequence. it makes an inverted tree | |
from functools import reduce | |
list2 = [1,2,3,4,5,6,7,8,9,10] | |
reduce(lambda x,y: x+y, list2) # 55 | |
list2 = [1,2,3,4,5,6,7,8,9,10] |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
fun = lambda x : x*x | |
fun(2) # Will return 2*2 i.e. 4 | |
def sumOfInt(n,fun): | |
sum = 0 | |
for i in range(0,n+1): | |
sum = sum + fun(i) | |
return sum | |
sumOfInt(5,lambda x: x*x) # Will return sum of squares till 5 i.e. 55 |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
dev.zookeeper = localhost:2181 | |
dev.bootstrap.server = localhost:9092 | |
dev.zookeeper.quorum = localhost | |
dev.zookeeper.port = 2181 | |
dev.window = 20 | |
prod.zookeeper = mapr02.itversity.com:5181,mapr03.itversity.com:5181,mapr04.itversity.com:5181 | |
prod.zookeeper.quorum = mapr02.itversity.com,mapr03.itversity.com,mapr04.itversity.com | |
prod.zookeeper.port = 5181 | |
prod.bootstrap.server = mapr02.itversity.com:9092,mapr03.itversity.com:9092,mapr04.itversity.com:9092 |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
val conf = ConfigFactory.load | |
val envProps: Config = conf.getConfig(args(0)) | |
val sparkConf = new SparkConf().setMaster("yarn").setAppName("SiteTraffic") | |
val streamingContext = new StreamingContext(sparkConf, Seconds(envProps.getInt("window"))) | |
val broadcastConfig = streamingContext.sparkContext.broadcast(envProps) | |
val topicsSet = Set("retail_logs") | |
val now = Calendar.getInstance().getTime() | |
val timestamp = streamingContext.sparkContext.broadcast(now) | |
val kafkaParams = Map[String, Object]( | |
"bootstrap.servers" -> envProps.getString("bootstrap.server"), |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
def getHbaseConnection(config: Config): Connection ={ | |
//Create Hbase Configuration Object | |
val hBaseConf: Configuration = HBaseConfiguration.create() | |
hBaseConf.set("hbase.zookeeper.quorum", config.getString("zookeeper.quorum")) | |
hBaseConf.set("hbase.zookeeper.property.clientPort", config.getString("zookeeper.port")) | |
hBaseConf.set("zookeeper.znode.parent","/hbase-unsecure") | |
hBaseConf.set("hbase.cluster.distributed","true") | |
//Establish Connection | |
val connection = ConnectionFactory.createConnection(hBaseConf) | |
connection |
NewerOlder