Skip to content

Instantly share code, notes, and snippets.

@vikas-gonti
Created July 1, 2018 22:24
Show Gist options
  • Save vikas-gonti/0d14fd4fa80c3855b00b774de03977a1 to your computer and use it in GitHub Desktop.
Save vikas-gonti/0d14fd4fa80c3855b00b774de03977a1 to your computer and use it in GitHub Desktop.
/*spark-shell --master yarn \
--conf spark.ui.port=12345 \
--num-executors 6 \
--executor-cores 2 \
--executor-memory 2G \
--packages com.databricks:spark-avro_2.10:2.0.1
*/
import com.databricks.spark.avro._;
var orderDF = sqlContext.read.avro("user/cloudera/problem1/orders");
var orderItemsDF = sqlContext.read.avro("user/cloudera/problem1/order-items");
//a.Just by using Data Frames API - here order_date should be YYYY-MM-DD format
var joinedOrderDataDF = orderDF.join(orderItemsDF,orderDF("order_id")===orderItemsDF("order_item_order_id"));
var dataFrameResult =
joinedOrderDataDF.groupBy(to_date(from_unixtime(col("order_date")/1000)).alias("order_formatted_date"),
col("order_status")).agg(round(sum("order_item_subtotal"),2).alias("total_amount"),
countDistinct("order_id").alias("total_orders")).orderBy(col("order_formatted_date").desc,
col("order_status"),col("total_amount").desc,col("total_orders"));
//Store the result as parquet file into hdfs using gzip compression under folder
//user/cloudera/problem1/result4a-gzip
sqlContext.setConf("spark.sql.parquet.compression.codec", "gzip")
dataFrameResult.coalesce(1).write.parquet("user/cloudera/problem1/result4a-gzip")
//Store the result as parquet file into hdfs using snappy compression under folder
//user/cloudera/problem1/result4a-snappy
sqlContext.setConf("spark.sql.parquet.compression.codec", "snappy")
dataFrameResult.coalesce(1).write.parquet("user/cloudera/problem1/result4a-snappy")
//Store the result as CSV file into hdfs using No compression under folder
//user/cloudera/problem1/result4b-csv
dataFrameResult.map(rec => rec.mkString(",")).coalesce(1).saveAsTextFile("user/cloudera/problem1/result4a-csv")
//b. Using Spark SQL - here order_date should be YYYY-MM-DD format
orderDF.registerTempTable("orders")
orderItemsDF.registerTempTable("orderItems")
val sqlResult = sqlContext.sql("select to_date(from_unixtime(cast(o.order_date/1000 as bigint))) as order_date, "+
"o.order_status, count(distinct(order_id)) total_orders, "+
"cast(sum(oi.order_item_subtotal) as DECIMAL (10,2)) total_amount "+
"from orders o, orderItems oi "+
"where o.order_id = oi.order_item_order_id "+
"group by to_date(from_unixtime(cast(o.order_date/1000 as bigint))), "+
"o.order_status "+
" order by order_date desc, order_status asc, total_amount desc, total_orders asc")
//Store the result as parquet file into hdfs using gzip compression under folder
//user/cloudera/problem1/result4b-gzip
sqlContext.setConf("spark.sql.parquet.compression.codec", "gzip")
sqlResult.coalesce(1).write.parquet("user/cloudera/problem1/result4b-gzip")
//Store the result as parquet file into hdfs using snappy compression under folder
//user/cloudera/problem1/result4b-snappy
sqlContext.setConf("spark.sql.parquet.compression.codec", "snappy")
sqlResult.coalesce(1).write.parquet("user/cloudera/problem1/result4b-snappy")
//Store the result as CSV file into hdfs using No compression under folder
//user/cloudera/problem1/result4b-csv
sqlResult.map(rec => rec.mkString(",")).coalesce(1).saveAsTextFile("user/cloudera/problem1/result4b-csv")
//c). By using combineByKey function on RDDS -- No need of formatting order_date or total_amount
val orderRDD = orderDF.rdd.map(rec => {
(rec(0).toString,(rec(1).toString,rec(3).toString))
})
val orderItemRDD = orderItemsDF.rdd.map(rec => {
(rec(1).toString,rec(4).toString.toFloat)
})
val joinedOrderDataRDD = orderRDD.join(orderItemRDD)
val comByKeyInt = joinedOrderDataRDD.map(rec => {
(rec._2._1,(rec._2._2,rec._1))
})
//val intermediateResult = comByKeyInt.filter(rec => rec._1._2 == "SUSPECTED_FRAUD").filter(rec => rec._1._1 == "1377835200000")
//intermediateResult.map(rec => (rec._1,rec._2._1)).reduceByKey((t,v) => t+v).first
//intermediateResult.map(rec => (rec._1,Set(rec._2))).reduceByKey((x,y) => (x._1 + y._1,x._2+y._2)).take(15).foreach(println)
val comByKeyResult = comByKeyInt.combineByKey((x)=>(x._1,Set(x._2)),
(x:(Float,Set[String]),y)=>(x._1 + y._1,x._2+y._2),
(x:(Float,Set[String]),y:(Float,Set[String]))=>(x._1+y._1,x._2++y._2)).
map(x=> (x._1._1,x._1._2,x._2._1,x._2._2.size)).toDF().
orderBy(col("_1").desc,col("_2"),col("_3").desc,col("_4"));
//Store the result as parquet file into hdfs using gzip compression under folder
//user/cloudera/problem1/result4a-gzip
sqlContext.setConf("spark.sql.parquet.compression.codec", "gzip")
comByKeyResult.coalesce(1).write.parquet("user/cloudera/problem1/result4c-gzip")
//Store the result as parquet file into hdfs using snappy compression under folder
//user/cloudera/problem1/result4a-snappy
sqlContext.setConf("spark.sql.parquet.compression.codec", "snappy")
comByKeyResult.coalesce(1).write.parquet("user/cloudera/problem1/result4c-snappy")
//Store the result as CSV file into hdfs using No compression under folder
//user/cloudera/problem1/result4b-csv
comByKeyResult.map(rec => rec.mkString(",")).coalesce(1).saveAsTextFile("user/cloudera/problem1/result4c-csv")
#CCA 175 Arun
sqoop import \
--connect jdbc:mysql://ms.itversity.com:3306/retail_db \
--username retail_user \
--password itversity \
--table orders \
--target-dir user/cloudera/problem1/orders \
--as-avrodatafile \
--compress \
--compression-codec org.apache.hadoop.io.compress.SnappyCodec \
--num-mappers 1
sqoop import \
--connect jdbc:mysql://ms.itversity.com:3306/retail_db \
--username retail_user \
--password itversity \
--table order_items \
--target-dir user/cloudera/problem1/order-items \
--as-avrodatafile \
--compress \
--compression-codec org.apache.hadoop.io.compress.SnappyCodec \
--num-mappers 1
# connect database
mysql -h ms.itversity.com -u retail_user -p
#password itversity
create table gontiv_result (order_date varchar(255) not null,order_status varchar(255) not null,
total_orders int, total_amount numeric, constraint pk_order_result primary key (order_date,order_status));
sqoop export \
--connect jdbc:mysql://ms.itversity.com:3306/retail_export \
--username retail_user \
--password itversity \
--export-dir user/cloudera/problem1/result4a-csv \
--table gontiv_result \
--columns "order_date,order_status,total_amount,total_orders"
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment