This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/** | |
* Created by itversity on 17/03/17. | |
*/ | |
/* build.sbt | |
name := "retail" | |
version := "1.0" | |
scalaVersion := "2.10.6" |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/** | |
* Created by itversity on 17/03/17. | |
* This is primarily to get the word count on the data received from | |
* nc -lk 19999 | |
* Make sure build.sbt is updated with the dependency - | |
* libraryDependencies += "org.apache.spark" % "spark-streaming_2.10" % "1.6.2" | |
* Create jar, ship the jar, start nc, and then use spark-submit | |
* spark-submit --class SparkStreamingWordCount --master yarn --conf spark.ui.port=14562 retail_2.10-1.0.jar | |
*/ | |
import org.apache.spark.SparkConf |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
val path = "/public/retail_db" or val path = "/Users/itversity/Research/data/retail_db" | |
val rdd = sc.textFile(path + "/orders") | |
rdd.reduce((agg, ele) => { | |
if(agg.split(",")(2).toInt < ele.split(",")(2).toInt) agg else ele | |
}) | |
rdd.top(2) | |
rdd.takeOrdered(5)(Ordering[Int].reverse.on(x => x.split(",")(2).toInt)).foreach(println) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
val orders = sc.textFile("/public/retail_db/orders") // On the lab accessing HDFS | |
val orders = sc.textFile("/Users/itversity/Research/data/retail_db/orders") // Accessing locally on the PC | |
// Change to valid path as per your preference. Make sure the directory orders exist in the path (locally or on HDFS) | |
orders.take(10).foreach(println) | |
val completedOrders = orders.filter(rec => rec.split(",")(3) == "COMPLETE") | |
val pendingOrders = orders. | |
filter(order => { | |
val o = order.split(",") | |
(o(3).contains("PENDING") || o(3) == "PROCESSING") && o(1).contains("2013-08") | |
}) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# fmp.conf: a multiplex agent to save one copy of data in HDFS and | |
# other copy streamed to Kafka so that data can be processed by | |
# streaming technologies such as Spark Streaming | |
# Name the components on this agent | |
fmp.sources = logsource | |
fmp.sinks = kafkasink hdfssink | |
fmp.channels = kafkachannel hdfschannel | |
# Describe/configure the source |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
sqoop list-databases \ | |
--connect "jdbc:mysql://nn01.itversity.com:3306" \ | |
--username retail_dba \ | |
--password itversity | |
#Make sure there is no directory specified in target-dir | |
#Also make sure you replace dgadiraju with appropriate user name | |
#For Cloudera or Hortonworks based VM environments you can change nn01.itversity.com | |
#to appropriate host names or localhost | |
sqoop import \ |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package retail | |
/** | |
* Created by itversity on 20/03/17. | |
*/ | |
/* build.sbt | |
name := "retail" |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# kandf.conf: Flume and Kafka integration | |
# Read streaming data from logs and push it to Kafka as sink | |
# Name the components on this agent | |
kandf.sources = logsource | |
kandf.sinks = ksink | |
kandf.channels = mchannel | |
# Describe/configure the source | |
kandf.sources.logsource.type = exec |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Run every time you login or update .bash_profile | |
export KAFKA_HOME=/usr/hdp/2.5.0.0-1245/kafka | |
PATH=$PATH:$KAFKA_HOME/bin | |
# Create topics | |
kafka-topics.sh --create \ | |
--zookeeper nn01.itversity.com:2181,nn02.itversity.com:2181,rm01.itversity.com \ | |
--replication-factor 1 \ | |
--partitions 1 \ | |
--topic kafkadg |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# flume-logger-hdfs.conf: Read data from logs and write it to both logger and hdfs | |
# flume command to start the agent - flume-ng agent --name a1 --conf /home/dgadiraju/flume_example/example.conf --conf-file example.conf | |
# Name the components on this agent | |
a1.sources = logsource | |
a1.sinks = loggersink hdfssink | |
a1.channels = loggerchannel hdfschannel | |
# Describe/configure the source | |
a1.sources.logsource.type = exec |
NewerOlder