Skip to content

Instantly share code, notes, and snippets.

/**
* Created by itversity on 17/03/17.
*/
/* build.sbt
name := "retail"
version := "1.0"
scalaVersion := "2.10.6"
/**
* Created by itversity on 17/03/17.
* This is primarily to get the word count on the data received from
* nc -lk 19999
* Make sure build.sbt is updated with the dependency -
* libraryDependencies += "org.apache.spark" % "spark-streaming_2.10" % "1.6.2"
* Create jar, ship the jar, start nc, and then use spark-submit
* spark-submit --class SparkStreamingWordCount --master yarn --conf spark.ui.port=14562 retail_2.10-1.0.jar
*/
import org.apache.spark.SparkConf
val path = "/public/retail_db" or val path = "/Users/itversity/Research/data/retail_db"
val rdd = sc.textFile(path + "/orders")
rdd.reduce((agg, ele) => {
if(agg.split(",")(2).toInt < ele.split(",")(2).toInt) agg else ele
})
rdd.top(2)
rdd.takeOrdered(5)(Ordering[Int].reverse.on(x => x.split(",")(2).toInt)).foreach(println)
val orders = sc.textFile("/public/retail_db/orders") // On the lab accessing HDFS
val orders = sc.textFile("/Users/itversity/Research/data/retail_db/orders") // Accessing locally on the PC
// Change to valid path as per your preference. Make sure the directory orders exist in the path (locally or on HDFS)
orders.take(10).foreach(println)
val completedOrders = orders.filter(rec => rec.split(",")(3) == "COMPLETE")
val pendingOrders = orders.
filter(order => {
val o = order.split(",")
(o(3).contains("PENDING") || o(3) == "PROCESSING") && o(1).contains("2013-08")
})
# fmp.conf: a multiplex agent to save one copy of data in HDFS and
# other copy streamed to Kafka so that data can be processed by
# streaming technologies such as Spark Streaming
# Name the components on this agent
fmp.sources = logsource
fmp.sinks = kafkasink hdfssink
fmp.channels = kafkachannel hdfschannel
# Describe/configure the source
sqoop list-databases \
--connect "jdbc:mysql://nn01.itversity.com:3306" \
--username retail_dba \
--password itversity
#Make sure there is no directory specified in target-dir
#Also make sure you replace dgadiraju with appropriate user name
#For Cloudera or Hortonworks based VM environments you can change nn01.itversity.com
#to appropriate host names or localhost
sqoop import \
package retail
/**
* Created by itversity on 20/03/17.
*/
/* build.sbt
name := "retail"
# kandf.conf: Flume and Kafka integration
# Read streaming data from logs and push it to Kafka as sink
# Name the components on this agent
kandf.sources = logsource
kandf.sinks = ksink
kandf.channels = mchannel
# Describe/configure the source
kandf.sources.logsource.type = exec
# Run every time you login or update .bash_profile
export KAFKA_HOME=/usr/hdp/2.5.0.0-1245/kafka
PATH=$PATH:$KAFKA_HOME/bin
# Create topics
kafka-topics.sh --create \
--zookeeper nn01.itversity.com:2181,nn02.itversity.com:2181,rm01.itversity.com \
--replication-factor 1 \
--partitions 1 \
--topic kafkadg
# flume-logger-hdfs.conf: Read data from logs and write it to both logger and hdfs
# flume command to start the agent - flume-ng agent --name a1 --conf /home/dgadiraju/flume_example/example.conf --conf-file example.conf
# Name the components on this agent
a1.sources = logsource
a1.sinks = loggersink hdfssink
a1.channels = loggerchannel hdfschannel
# Describe/configure the source
a1.sources.logsource.type = exec