Created
July 1, 2018 22:24
-
-
Save vikas-gonti/0d14fd4fa80c3855b00b774de03977a1 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/*spark-shell --master yarn \ | |
--conf spark.ui.port=12345 \ | |
--num-executors 6 \ | |
--executor-cores 2 \ | |
--executor-memory 2G \ | |
--packages com.databricks:spark-avro_2.10:2.0.1 | |
*/ | |
import com.databricks.spark.avro._; | |
var orderDF = sqlContext.read.avro("user/cloudera/problem1/orders"); | |
var orderItemsDF = sqlContext.read.avro("user/cloudera/problem1/order-items"); | |
//a.Just by using Data Frames API - here order_date should be YYYY-MM-DD format | |
var joinedOrderDataDF = orderDF.join(orderItemsDF,orderDF("order_id")===orderItemsDF("order_item_order_id")); | |
var dataFrameResult = | |
joinedOrderDataDF.groupBy(to_date(from_unixtime(col("order_date")/1000)).alias("order_formatted_date"), | |
col("order_status")).agg(round(sum("order_item_subtotal"),2).alias("total_amount"), | |
countDistinct("order_id").alias("total_orders")).orderBy(col("order_formatted_date").desc, | |
col("order_status"),col("total_amount").desc,col("total_orders")); | |
//Store the result as parquet file into hdfs using gzip compression under folder | |
//user/cloudera/problem1/result4a-gzip | |
sqlContext.setConf("spark.sql.parquet.compression.codec", "gzip") | |
dataFrameResult.coalesce(1).write.parquet("user/cloudera/problem1/result4a-gzip") | |
//Store the result as parquet file into hdfs using snappy compression under folder | |
//user/cloudera/problem1/result4a-snappy | |
sqlContext.setConf("spark.sql.parquet.compression.codec", "snappy") | |
dataFrameResult.coalesce(1).write.parquet("user/cloudera/problem1/result4a-snappy") | |
//Store the result as CSV file into hdfs using No compression under folder | |
//user/cloudera/problem1/result4b-csv | |
dataFrameResult.map(rec => rec.mkString(",")).coalesce(1).saveAsTextFile("user/cloudera/problem1/result4a-csv") | |
//b. Using Spark SQL - here order_date should be YYYY-MM-DD format | |
orderDF.registerTempTable("orders") | |
orderItemsDF.registerTempTable("orderItems") | |
val sqlResult = sqlContext.sql("select to_date(from_unixtime(cast(o.order_date/1000 as bigint))) as order_date, "+ | |
"o.order_status, count(distinct(order_id)) total_orders, "+ | |
"cast(sum(oi.order_item_subtotal) as DECIMAL (10,2)) total_amount "+ | |
"from orders o, orderItems oi "+ | |
"where o.order_id = oi.order_item_order_id "+ | |
"group by to_date(from_unixtime(cast(o.order_date/1000 as bigint))), "+ | |
"o.order_status "+ | |
" order by order_date desc, order_status asc, total_amount desc, total_orders asc") | |
//Store the result as parquet file into hdfs using gzip compression under folder | |
//user/cloudera/problem1/result4b-gzip | |
sqlContext.setConf("spark.sql.parquet.compression.codec", "gzip") | |
sqlResult.coalesce(1).write.parquet("user/cloudera/problem1/result4b-gzip") | |
//Store the result as parquet file into hdfs using snappy compression under folder | |
//user/cloudera/problem1/result4b-snappy | |
sqlContext.setConf("spark.sql.parquet.compression.codec", "snappy") | |
sqlResult.coalesce(1).write.parquet("user/cloudera/problem1/result4b-snappy") | |
//Store the result as CSV file into hdfs using No compression under folder | |
//user/cloudera/problem1/result4b-csv | |
sqlResult.map(rec => rec.mkString(",")).coalesce(1).saveAsTextFile("user/cloudera/problem1/result4b-csv") | |
//c). By using combineByKey function on RDDS -- No need of formatting order_date or total_amount | |
val orderRDD = orderDF.rdd.map(rec => { | |
(rec(0).toString,(rec(1).toString,rec(3).toString)) | |
}) | |
val orderItemRDD = orderItemsDF.rdd.map(rec => { | |
(rec(1).toString,rec(4).toString.toFloat) | |
}) | |
val joinedOrderDataRDD = orderRDD.join(orderItemRDD) | |
val comByKeyInt = joinedOrderDataRDD.map(rec => { | |
(rec._2._1,(rec._2._2,rec._1)) | |
}) | |
//val intermediateResult = comByKeyInt.filter(rec => rec._1._2 == "SUSPECTED_FRAUD").filter(rec => rec._1._1 == "1377835200000") | |
//intermediateResult.map(rec => (rec._1,rec._2._1)).reduceByKey((t,v) => t+v).first | |
//intermediateResult.map(rec => (rec._1,Set(rec._2))).reduceByKey((x,y) => (x._1 + y._1,x._2+y._2)).take(15).foreach(println) | |
val comByKeyResult = comByKeyInt.combineByKey((x)=>(x._1,Set(x._2)), | |
(x:(Float,Set[String]),y)=>(x._1 + y._1,x._2+y._2), | |
(x:(Float,Set[String]),y:(Float,Set[String]))=>(x._1+y._1,x._2++y._2)). | |
map(x=> (x._1._1,x._1._2,x._2._1,x._2._2.size)).toDF(). | |
orderBy(col("_1").desc,col("_2"),col("_3").desc,col("_4")); | |
//Store the result as parquet file into hdfs using gzip compression under folder | |
//user/cloudera/problem1/result4a-gzip | |
sqlContext.setConf("spark.sql.parquet.compression.codec", "gzip") | |
comByKeyResult.coalesce(1).write.parquet("user/cloudera/problem1/result4c-gzip") | |
//Store the result as parquet file into hdfs using snappy compression under folder | |
//user/cloudera/problem1/result4a-snappy | |
sqlContext.setConf("spark.sql.parquet.compression.codec", "snappy") | |
comByKeyResult.coalesce(1).write.parquet("user/cloudera/problem1/result4c-snappy") | |
//Store the result as CSV file into hdfs using No compression under folder | |
//user/cloudera/problem1/result4b-csv | |
comByKeyResult.map(rec => rec.mkString(",")).coalesce(1).saveAsTextFile("user/cloudera/problem1/result4c-csv") |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#CCA 175 Arun | |
sqoop import \ | |
--connect jdbc:mysql://ms.itversity.com:3306/retail_db \ | |
--username retail_user \ | |
--password itversity \ | |
--table orders \ | |
--target-dir user/cloudera/problem1/orders \ | |
--as-avrodatafile \ | |
--compress \ | |
--compression-codec org.apache.hadoop.io.compress.SnappyCodec \ | |
--num-mappers 1 | |
sqoop import \ | |
--connect jdbc:mysql://ms.itversity.com:3306/retail_db \ | |
--username retail_user \ | |
--password itversity \ | |
--table order_items \ | |
--target-dir user/cloudera/problem1/order-items \ | |
--as-avrodatafile \ | |
--compress \ | |
--compression-codec org.apache.hadoop.io.compress.SnappyCodec \ | |
--num-mappers 1 | |
# connect database | |
mysql -h ms.itversity.com -u retail_user -p | |
#password itversity | |
create table gontiv_result (order_date varchar(255) not null,order_status varchar(255) not null, | |
total_orders int, total_amount numeric, constraint pk_order_result primary key (order_date,order_status)); | |
sqoop export \ | |
--connect jdbc:mysql://ms.itversity.com:3306/retail_export \ | |
--username retail_user \ | |
--password itversity \ | |
--export-dir user/cloudera/problem1/result4a-csv \ | |
--table gontiv_result \ | |
--columns "order_date,order_status,total_amount,total_orders" | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment