Skip to content

Instantly share code, notes, and snippets.

@vikas-gonti
Last active September 14, 2019 17:53
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save vikas-gonti/c8edccf88526361a372eb151136e1b61 to your computer and use it in GitHub Desktop.
Save vikas-gonti/c8edccf88526361a372eb151136e1b61 to your computer and use it in GitHub Desktop.
Solution to 20 questions at http://nn02.itversity.com/cca175/
:'Problem 1
#Connect to the MySQL database on the itversity labs using sqoop and import all of the data from the orders table into HDFS
Output Requirements
#Place the customer files in the HDFS directory
#/user/yourusername/problem1/solution/
#Replace yourusername with your OS user name
#Use a text format with comma as the columnar delimiter
#Load every order record completely'
sqoop import \
--connect jdbc:mysql://ms.itversity.com:3306/retail_db \
--username retail_user \
--password itversity \
--table orders \
--target-dir /user/vikasgonti/itversity/problem1/solution/
:'#Problem 2
#Get the customers who have not placed any orders, sorted by customer_lname and then customer_fname
Output Requirements
#Target Columns: customer_lname, customer_fname
#Number of Files: 1
#Place the output file in the HDFS directory
#/user/yourusername/problem2/solution/
#Replace yourusername with your OS user name
#File format should be text
#delimiter is (",")
#Compression: Uncompressed
'
sqoop import \
--connect jdbc:mysql://ms.itversity.com:3306/retail_db \
--username retail_user \
--password itversity \
--target-dir /user/vikasgonti/itversity/problem2/solution/ \
-m 1 \
--query "select customer_lname, customer_fname from customers left outer join orders on customer_id = order_customer_id where \$CONDITIONS and order_customer_id is null order by customer_lname, customer_fname"
:'#Problem 3
#Get top 3 crime types based on number of incidents in RESIDENCE area using "Location Description"
Output Requirements
#Output Fields: crime_type, incident_count
#Output File Format: JSON
#Delimiter: N/A
#Compression: No
#Place the output file in the HDFS directory
#/user/yourusername/problem3/solution/
#Replace yourusername with your OS user name'
val crimeData = sc.textFile("/public/crime/csv")
val crimeDataHeader = crimeData.first
val crimeDatawithoutHeader = crimeData.filter(rec => rec!=crimeDataHeader)
crimeDatawithoutHeader.take(10).foreach(println)
val crimeRDD = crimeDatawithoutHeader.map(rec => {
val t = rec.split(",(?=(?:[^\"]*\"[^\"]*\")*[^\"]*$)", -1)
(t(5),t(7))
})
val crimeDF = crimeRDD.toDF("type","location")
crimeDF.registerTempTable("crime");
val res = sqlContext.sql("select * from (select type as crime_type, count(1) as incident_count from crime "+
"where location = 'RESIDENCE' group by type order by incident_count desc) A limit 3");
res.toJSON.saveAsTextFile("/user/vikasgonti/itversity/problem3/solution/")
:'#Problem 4
#Convert NYSE data into parquet
Output Requirements
#Column Names: stockticker, transactiondate, openprice, highprice, lowprice, closeprice, volume
#Convert file format to parquet
#Place the output file in the HDFS directory
#/user/yourusername/problem4/solution/
#Replace yourusername with your OS user name'
val nyseRDD = sc.textFile("/user/vikasgonti/data/nyse")
val nyseDF = nyseRDD.map(rec => {
val t = rec.split(",")
(t(0),t(1),t(2),t(3),t(4),t(5),t(6))
}).toDF("stockticker","transactiondate","openprice","highprice","lowprice","closeprice","volume")
nyseDF.write.parquet("/user/vikasgonti/itversity/problem4/solution/")
:'#problem 5
#Get word count for the input data using space as delimite
Output Requirements
#Output File format: Avro
#Output fields: word, count
#Compression: Uncompressed
#Place the customer files in the HDFS directory
#/user/yourusername/problem5/solution/
#Replace yourusername with your OS user name'
spark-shell --master yarn \
--conf spark.ui.port=12456 \
--num-executors 10 \
--executor-memory 3G \
--executor-cores 2 \
--packages com.databricks:spark-avro_2.10:2.0.1
val wordsRDD = sc.textFile("/public/randomtextwriter")
val wordsFlat = wordsRDD.flatMap(rec => rec.split(" "))
val wordsMap = wordsFlat.map(rec => (rec,1))
val wordsCount = wordsMap.reduceByKey((t,v) => t+v,8)
val wordsDF = wordsCount.toDF("word","count")
wordsDF.write.avro("/user/vikasgonti/itversity/problem5/solution/")
:'#problem 6
#Get total number of orders for each customer where the cutomer_state = 'TX'
Output Requirements
#Output Fields: customer_fname, customer_lname, order_count
#File Format: text
#Delimiter: Tab character (\t)
#Place the result file in the HDFS directory
#/user/yourusername/problem6/solution/
#Replace yourusername with your OS user name'
val orders = sc.textFile("/public/retail_db/orders")
val customers = sc.textFile("/public/retail_db/customers")
val ordersDF = orders.map(rec => {
val t = rec.split(",")
(t(0),t(2))
}).toDF("order_id","order_customer_id")
val customersDF = customers.map(rec => {
val t = rec.split(",")
(t(0),t(1),t(2), t(7))
}).toDF("customer_id","customer_fname","customer_lname","customer_state")
ordersDF.registerTempTable("orders")
customersDF.registerTempTable("customers")
val res = sqlContext.sql("select customer_fname, customer_lname, count(order_id) order_count "+
"from customers, orders where customer_id = order_customer_id and customer_state = 'TX' "+
"group by customer_fname, customer_lname ")
res.map(rec => rec.mkString("\t")).saveAsTextFile("/user/vikasgonti/itversity/problem6/solution/")
:'#problem 7
#List the names of the Top 5 products by revenue ordered on '2013-07-26'. Revenue is considered only for COMPLETE and CLOSED orders.
Output Requirements
#Target Columns: order_date, order_revenue, product_name, product_category_id
#Data has to be sorted in descending order by order_revenue
#File Format: text
#Delimiter: colon (:)
#Place the output file in the HDFS directory
#/user/yourusername/problem7/solution/
#Replace yourusername with your OS user name
'
val orders = sc.textFile("/public/retail_db/orders")
val orderItems = sc.textFile("/public/retail_db/order_items")
val products = sc.textFile("/public/retail_db/products")
val ordersDF = orders.map(rec => {
val t = rec.split(",")
(t(0),t(1).split(" ")(0), t(3))
}).toDF("order_id","order_date","order_status")
val orderItemsDF = orderItems.map(rec => {
val t = rec.split(",")
(t(1),t(2), t(4))
}).toDF("order_item_order_id","order_item_product_id","order_item_subtotal")
val productsDF = products.map(rec => {
val t = rec.split(",")
(t(0),t(1),t(2))
}).toDF("product_id","product_category_id","product_name")
ordersDF.registerTempTable("orders")
orderItemsDF.registerTempTable("orderItems")
productsDF.registerTempTable("products")
val res = sqlContext.sql("select DISTINCT order_date, round(sum(order_item_subtotal) over (partition by product_id), 2) order_revenue, "+
"product_name,product_category_id from orders, products, orderItems where order_id = order_item_order_id "+
"and order_item_product_id = product_id and order_date ='2013-07-26' and order_status IN ('COMPLETE','CLOSED') "+
"order by order_revenue desc limit 5")
res.map(rec => rec.mkString(":")).saveAsTextFile("/user/vikasgonti/itversity/problem7/solution/")
:'#problem 8
#List the order Items where the order_status = PENDING PAYMENT order by order_id
Output Requirements
#Target columns: order_id, order_date, order_customer_id, order_status
#File Format: orc
#Place the output files in the HDFS directory
#/user/yourusername/problem8/solution/
#Replace yourusername with your OS user name'
val ordersRDD = sc.textFile("/public/retail_db/orders")
val ordersDF = orders.map(rec => {
val t = rec.split(",")
(t(0).toInt,t(1), t(2), t(3))
}).toDF("order_id","order_date","order_customer_id","order_status")
val res = ordersDF.filter("order_status = 'PENDING_PAYMENT'").orderBy("order_id")
res.write.orc("/user/vikasgonti/itversity/problem8/solution/")
:'#problem 9
#Remove header from h1b data
Output Requirements
#Remove the header from the data and save rest of the data as is
#Data should be compressed using snappy algorithm
#Place the H1B data in the HDFS directory
#/user/yourusername/problem9/solution/
#Replace yourusername with your OS user name
'
val h1bdata = sc.textFile("/public/h1b/h1b_data")
val h1bHeader = h1bdata.first
val h1bdatawithoutheader = h1bdata.filter(rec => rec!=h1bHeader)
h1bdatawithoutheader.saveAsTextFile("/user/vikasgonti/itversity/problem9/solution/",classOf[org.apache.hadoop.io.compress.SnappyCodec])
:'#problem 10
#Get number of LCAs filed for each year
Output Requirements
#File Format: text
#Output Fields: YEAR, NUMBER_OF_LCAS
#Delimiter: Ascii null "\0"
#Place the output files in the HDFS directory
#/user/yourusername/problem10/solution/
#Replace yourusername with your OS user name'
# ID CASE_STATUS EMPLOYER_NAME SOC_NAME JOB_TITLE FULL_TIME_POSITION PREVAILING_WAGE YEAR WORKSITE lon lat
val h1bDF = h1bdatawithoutheader.map(rec => {
val t = rec.split("\0")
(t(7))
}).toDF("YEAR")
h1bDF.registerTempTable("h1bdata")
val res = sqlContext.sql("select YEAR, count(1) as NUMBER_OF_LCAS from h1bdata where year != 'NA' group by year")
res.map(rec => rec.mkString("\0")).saveAsTextFile("/user/vikasgonti/itversity/problem10/solution/")
:'#problem 11
#Get number of LCAs by status for the year 2016
Output Requirements
#File Format: json
#Output Field Names: year, status, count
#Place the output files in the HDFS directory
#/user/yourusername/problem11/solution/
#Replace yourusername with your OS user name'
val h1bDF = h1bdatawithoutheader.map(rec => {
val t = rec.split("\0")
(t(1),t(7))
}).toDF("Status", "Year")
h1bDF.registerTempTable("h1bdata")
val res = sqlContext.sql("select Year, Status, Count(1) as count from h1bdata where year != 'NA' and year = 2016 group by year, status")
res.toJSON.saveAsTextFile("/user/vikasgonti/itversity/problem11/solution/")
:'#problem 12
#Get top 5 employers for year 2016 where the status is WITHDRAWN or CERTIFIED-WITHDRAWN or DENIED
Output Requirements
#File Format: parquet
#Output Fields: employer_name, lca_count
#Data needs to be in descending order by count
#Place the output files in the HDFS directory
#/user/yourusername/problem12/solution/
#Replace yourusername with your OS user name'
val h1bDF = h1bdatawithoutheader.map(rec => {
val t = rec.split("\0")
(t(1),t(2),t(7))
}).toDF("Status","Employer","Year")
h1bDF.registerTempTable("h1bdata")
val res = sqlContext.sql("select Employer employer_name, count(1) lca_count from h1bdata where year != 'NA' and year = 2016 "+
"and status IN ('WITHDRAWN','CERTIFIED-WITHDRAWN','DENIED') group by Employer order by lca_count desc limit 5")
res.write.parquet("/user/vikasgonti/itversity/problem12/solution/")
:'#problem 13
#Copy all h1b data from HDFS to Hive table excluding those where year is NA or prevailing_wage is NA
Output Requirements
#Save it in Hive Database
#Create Database: CREATE DATABASE IF NOT EXISTS yourusername
#Switch Database: USE yourusername
#Save data to hive table h1b_data
#Create table command:
CREATE TABLE h1b_data (
ID INT,
CASE_STATUS STRING,
EMPLOYER_NAME STRING,
SOC_NAME STRING,
JOB_TITLE STRING,
FULL_TIME_POSITION STRING,
PREVAILING_WAGE DOUBLE,
YEAR INT,
WORKSITE STRING,
LONGITUDE STRING,
LATITUDE STRING
)
Replace yourusername with your OS user name'
val h1bdatawithoutheader = sc.textFile("/public/h1b/h1b_data_noheader")
val h1bDF = h1bdatawithoutheader.map(rec => {
val t = rec.split("\0")
(t(0),t(1),t(2),t(3),t(4),t(5),t(6),t(7),t(8),t(9),t(10))
}).toDF("ID","CASE_STATUS","EMPLOYER_NAME","SOC_NAME","JOB_TITLE","FULL_TIME_POSITION","PREVAILING_WAGE","YEAR","WORKSITE","LONGITUDE","LATITUDE")
sqlContext.sql("use vghivedatabase")
sqlContext.sql("show tables").show
h1bDF.registerTempTable("h1bdata")
sqlContext.sql("insert into h1b_data select * from h1bdata where year != 'NA' and PREVAILING_WAGE!='NA'")
:'#problem 14
#Export h1b data from hdfs to MySQL Database
Output Requirements
#Export data to MySQL Database
#MySQL database is running on ms.itversity.com
#User: h1b_user
#Password: itversity
Database Name: h1b_export
Table Name: h1b_data_yourusername
Nulls are represented as: NA
After export nulls should not be stored as NA in database. It should be represented as database null
Create table command:
CREATE TABLE h1b_data_yourusername (
ID INT,
CASE_STATUS VARCHAR(50),
EMPLOYER_NAME VARCHAR(100),
SOC_NAME VARCHAR(100),
JOB_TITLE VARCHAR(100),
FULL_TIME_POSITION VARCHAR(50),
PREVAILING_WAGE FLOAT,
YEAR INT,
WORKSITE VARCHAR(50),
LONGITUDE VARCHAR(50),
LATITUDE VARCHAR(50));
#Replace yourusername with your OS user name
#Above create table command can be run using
#Login using mysql -u h1b_user -h ms.itversity.com -p
#When prompted enter password itversity
#Switch to database using use h1b_export
#Run above create table command by replacing yourusername with your OS user name'
sqoop export \
--connect jdbc:mysql://ms.itversity.com:3306/h1b_export \
--username h1b_user \
--password itversity \
--table h1b_data_vg \
--export-dir /public/h1b/h1b_data_to_be_exported \
--input-fields-terminated-by '\001' \
--input-null-string 'NA'
:'#problem 15
#Connect to the MySQL database on the itversity labs using sqoop and import data with case_status as CERTIFIED
Output Requirements
#Place the h1b related data in files in HDFS directory
#/user/yourusername/problem15/solution/
#Replace yourusername with your OS user name
#Use avro file format
#Load only those records which have case_status as CERTIFIED completely
#There are 2615623 such records'
sqoop import \
--connect jdbc:mysql://ms.itversity.com:3306/h1b_db \
--username h1b_user \
--password itversity \
--table h1b_data \
--where "CASE_STATUS = 'CERTIFIED'" \
--target-dir /user/vikasgonti/itversity/problem15/solution/ \
--as-avrodatafile
:'#problem 16
#Get NYSE data in ascending order by date and descending order by volume
Output Requirements
#Save data back to HDFS
#Column order: stockticker, transactiondate, openprice, highprice, lowprice, closeprice, volume
#File Format: text
#Delimiter: :
#Place the sorted NYSE data in the HDFS directory
#/user/yourusername/problem16/solution/
#Replace yourusername with your OS user name'
val nyseRDD = sc.textFile("/public/nyse")
val nyseDF = nyseRDD.map(rec => {
val t = rec.split(",")
(t(0).toString,t(1).toInt,t(2).toFloat,t(3).toFloat,t(4).toFloat,t(5).toFloat,t(6).toInt)
}).toDF("stockticker","transactiondate","openprice","highprice","lowprice","closeprice","volume")
val res = nyseDF.orderBy(col("transactiondate"),col("volume").desc)
res.map(rec => rec.mkString(":")).saveAsTextFile("/user/vikasgonti/itversity/problem16/solution/")
:'#problem 17
#Get the stock tickers from NYSE data for which full name is missing in NYSE symbols data
Output Requirements
#Get unique stock ticker for which corresponding names are missing in NYSE symbols data
#Save data back to HDFS
#File Format: avro
#Avro dependency details:
#groupId -> com.databricks, artifactId -> spark-avro_2.10, version -> 2.0.1
#Place the sorted NYSE data in the HDFS directory
#/user/yourusername/problem17/solution/
#Replace yourusername with your OS user name'
val nyseRDD = sc.textFile("/public/nyse")
val nyseDF = nyseRDD.map(rec => {
val t = rec.split(",")
(t(0).toString)
}).toDF("stockticker")
nyseDF.registerTempTable("nyse")
val nysesymRDD = sc.textFile("/public/nyse_symbols")
val first = nysesymRDD.first
val symDataDF = nysesymRDD.filter(rec => rec!= first).map(rec => {
val t = rec.split("\t")
(t(0).toString)
}).toDF("stocksymbol")
symDataDF.registerTempTable("symData")
val res = sqlContext.sql("select DISTINCT stockticker as Symbol from nyse n "+
"left outer join symData s on n.stockticker = s.stocksymbol and s.stocksymbol is null")
import com.databricks.spark.avro._
res.write.avro("/user/vikasgonti/itversity/problem17/solution/")
:'#problem 18
#Get the name of stocks displayed along with other information
Output Requirements
#Get all NYSE details along with stock name if exists, if not stockname should be empty
#Column Order: stockticker, stockname, transactiondate, openprice, highprice, lowprice, closeprice, volume
#Delimiter: ,
#File Format: text
#Place the data in the HDFS directory
#/user/yourusername/problem18/solution/
#Replace yourusername with your OS user name'
val nyseRDD = sc.textFile("/public/nyse")
val nyseDF = nyseRDD.map(rec => {
val t = rec.split(",")
(t(0).toString,t(1).toInt,t(2).toFloat,t(3).toFloat,t(4).toFloat,t(5).toFloat,t(6).toInt)
}).toDF("stockticker","transactiondate","openprice","highprice","lowprice","closeprice","volume")
nyseDF.registerTempTable("nyse")
val nysesymRDD = sc.textFile("/public/nyse_symbols")
val first = nysesymRDD.first
val symDataDF = nysesymRDD.filter(rec => rec!= first).map(rec => {
val t = rec.split("\t")
(t(0).toString, t(1).toString)
}).toDF("stocksymbol", "stockname")
symDataDF.registerTempTable("symData")
val res = sqlContext.sql("select stockticker, nvl(stockname,'') stockname, transactiondate, openprice, "+
"highprice, lowprice, closeprice, volume "+
"from nyse n left outer join symData s on n.stockticker = s.stocksymbol")
res.map(rec => rec.mkString(",")).saveAsTextFile("/user/vikasgonti/itversity/problem18/solution/")
:'#problem 19
#Get number of companies who filed LCAs for each year
Output Requirements
#File Format: text
#Delimiter: tab character "\t"
#Output Field Order: year, lca_count
#Place the output files in the HDFS directory
#/user/yourusername/problem19/solution/
#Replace yourusername with your OS user name'
val h1bdata = sc.textFile("/public/h1b/h1b_data_noheader")
val h1bDF = h1bdata.map(rec => {
val t = rec.split("\0")
(t(2),t(7))
}).toDF("EMPLOYER_NAME","YEAR")
h1bDF.registerTempTable("h1bdata")
val res = sqlContext.sql("select YEAR, count(EMPLOYER_NAME) as lca_count from h1bdata where year != 'NA' group by year")
res.map(rec => rec.mkString("\t")).saveAsTextFile("/user/vikasgonti/itversity/problem19/solution/")
:'#problem 20
#using sqoop and import data with employer_name, case_status and count.
#Make sure data is sorted by employer_name in ascending order and by count in descending order
Output Requirements
#Place the h1b related data in files in HDFS directory
#/user/yourusername/problem20/solution/
#Replace yourusername with your OS user name
#Use text file format and tab (\t) as delimiter
#Hint: You can use Spark with JDBC or Sqoop import with query
#You might not get such hints in actual exam
#Output should contain employer name, case status and count
'
sqoop eval \
--connect jdbc:mysql://ms.itversity.com:3306/h1b_db \
--username h1b_user \
--password itversity \
--query "select count(*) from (select EMPLOYER_NAME, CASE_STATUS, count(1) as count from h1b_data group by EMPLOYER_NAME, CASE_STATUS) A"
sqoop import \
-Dorg.apache.sqoop.splitter.allow_text_splitter=true \
--connect jdbc:mysql://ms.itversity.com:3306/h1b_db \
--username h1b_user \
--password itversity \
--query "select EMPLOYER_NAME, CASE_STATUS, count(1) as count from h1b_data where \$CONDITIONS group by EMPLOYER_NAME, CASE_STATUS order by EMPLOYER_NAME, count desc" \
--target-dir /user/vikasgonti/itversity/problem20/solution/ \
--as-textfile \
--fields-terminated-by '\t' \
--split-by case_status
@codersham
Copy link

Hi Vikas,

This is regarding your solution on problem#7. I see following issue on the SQL:
"limit" clause is eliminating the products with same revenues. Instead, dense_rank should have been used to get the Top 5 revenue generating products.
Your results shows the following rows 4 times. Which is not correct.

order_date,order_revenue,product_name,product_category_id
"2013-07-26 00:00:00.0",10549.47,"insta-bed Neverflat Air Mattress",44
"2013-07-26 00:00:00.0",10549.47,"Field & Stream Sportsman 16 Gun Fire Safe",45
"2013-07-26 00:00:00.0",10549.47,"Field & Stream Sportsman 16 Gun Fire Safe",45
"2013-07-26 00:00:00.0",10549.47,"Field & Stream Sportsman 16 Gun Fire Safe",45
"2013-07-26 00:00:00.0",10549.47,"Field & Stream Sportsman 16 Gun Fire Safe",45

It shouldn't have generated the duplicate rows.The data should have rolled up to the product name. Meaning, one product should appear only once in the result.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment