Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save dgadiraju/fa872dfb45b8031d2ad43fa4c452fe2a to your computer and use it in GitHub Desktop.
Save dgadiraju/fa872dfb45b8031d2ad43fa4c452fe2a to your computer and use it in GitHub Desktop.
Get inactive customers using left outer join between orders and customers using Data Frames and SQL
/*
spark-shell --master yarn \
--conf spark.ui.port=12345 \
--num-executors 1 \
--executor-cores 1 \
--executor-memory 2G
*/
import scala.io.Source
val ordersRaw = Source.fromFile("/data/retail_db/orders/part-00000").getLines.toList
val ordersRDD = sc.parallelize(ordersRaw)
val customersRaw = Source.fromFile("/data/retail_db/customers/part-00000").getLines.toList
val customersRDD = sc.parallelize(customersRaw)
val ordersDF = ordersRDD.
map(o => o.split(",")(2).toInt).
toDF("order_customer_id")
val customersDF = customersRDD.
map(c => (c.split(",")(0).toInt, c.split(",")(1), c.split(",")(2))).
toDF("customer_id", "customer_fname", "customer_lname")
ordersDF.registerTempTable("orders_dg")
customersDF.registerTempTable("customers_dg")
sqlContext.setConf("spark.sql.shuffle.partitions", "1")
sqlContext.
sql("select customer_lname, customer_fname " +
"from customers_dg left outer join orders_dg " +
"on customer_id = order_customer_id " +
"where order_customer_id is null " +
"order by customer_lname, customer_fname").
rdd.
map(rec => rec.mkString(", ")).
saveAsTextFile("/user/dgadiraju/solutions/solutions02/inactive_customers")
@himanish532
Copy link

Sir this solution is not working for me...

I'm getting below error:
ERROR TaskSetManager: Task 0 in stage 12.0 failed 4 times; aborting job
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 12.0 failed 4 times, most recent failure: Lost task 0.3 in stage 12.0 (TID 217, wn04.itversity.com): java.lang.ClassNotFoundException: $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$2

@kumarsandeep8092
Copy link

Spark with Scala Solution

val cust=spark.read.option("sep","\t").csv("/data/retail_db/customers").select(col("_c0").as("customer_id"),col("_c1").as("fname"),col("_c2").as("lname"))
val orders = spark.read.option("sep",",").csv("/data/retail_db/orders").toDF("Order_id","order_date","customer_id","order_status")

cust.join(orders, Seq("cid"), "left_anti").select("lname", "fname").orderBy("lname", "fname").map(x=>x.mkString(",")).coalesce(1).write.option("compression","none").text("/user/cloudera/cca175-exercise-sol/solutions02/inactive_customers")

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment