Skip to content

Instantly share code, notes, and snippets.

@vikas-gonti
Created June 30, 2018 19:45
Show Gist options
  • Save vikas-gonti/c091fba080f5e9e089f43975e371d3e9 to your computer and use it in GitHub Desktop.
Save vikas-gonti/c091fba080f5e9e089f43975e371d3e9 to your computer and use it in GitHub Desktop.
Get inactive customers using left outer join between orders and customers.
/*
sqoop import \
--connect jdbc:mysql://ms.itversity.com:3306/retail_db \
--username retail_user \
--password itversity \
--table orders \
--target-dir data/retail_db/orders \
--as-textfile \
--num-mappers 1
*/
/*
sqoop import \
--connect jdbc:mysql://ms.itversity.com:3306/retail_db \
--username retail_user \
--password itversity \
--table customers \
--target-dir data/retail_db/customers \
--as-textfile \
--num-mappers 1
*/
/* Data is available in local file system /data/retail_db
Source directories: /data/retail_db/orders and /data/retail_db/customers
Source delimiter: comma (“,”)
Source Columns - orders - order_id, order_date, order_customer_id, order_status
Source Columns - customers - customer_id, customer_fname, customer_lname and many more
Get the customers who have not placed any orders, sorted by customer_lname and then customer_fname
Target Columns: customer_lname, customer_fname
Number of files - 1
Target Directory: /user/<YOUR_USER_ID>/solutions/solutions02/inactive_customers
Target File Format: TEXT
Target Delimiter: comma (“, ”)
Compression: N/A */
/*
spark-shell --master yarn \
--conf spark.ui.port=12345 \
--num-executors 1 \
--executor-cores 1 \
--executor-memory 2G
*/
val orders = sc.textFile("data/retail_db/orders")
val customers = sc.textFile("data/retail_db/customers")
val ordersRDD = orders.map(rec => (rec.split(",")(0), rec.split(",")(2)))
val customersRDD = customers.map(rec => {
val r = rec.split(",")
(r(0),r(1),r(2))
})
val ordersDF = ordersRDD.toDF("order_id","order_customer_id")
val customersDF = customersRDD.toDF("customer_id","customer_fname","customer_lname")
//sqlContext
ordersDF.registerTempTable("orders")
customersDF.registerTempTable("customers")
val sqlResult = sqlContext.sql("select customer_lname, customer_fname "+
"from customers c left outer join orders o on "+
"o.order_customer_id = c.customer_id "+
"where o.order_customer_id is null "+
"order by c.customer_lname, c.customer_fname")
sqlResult.coalesce(1).map(rec => rec.mkString(",")).saveAsTextFile("user/gontiv/solutions/solutions02/inactive_customers")
//CoreApi
val ordersRDD = orders.map(rec => (rec.split(",")(2), rec.split(",")(0)))
val customersRDD = customers.map(rec => {
val r = rec.split(",")
(r(0),(r(2),r(1)))
})
val leftJoinData = customersRDD.leftOuterJoin(ordersRDD);
val temp = leftJoinData.filter(rec => rec._2._2 == None).map(rec => rec._2).sortByKey()
temp.map(rec => rec._1._1+" , "+rec._1._2).coalesce(1).
saveAsTextFile("user/gontiv/solutions/solutions02_core/inactive_customers")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment