Created
June 13, 2016 02:45
-
-
Save larkz/3f4924296b992e6e1873f9f713d7331e to your computer and use it in GitHub Desktop.
Bolton Measurement Summary
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
val num = "27" | |
val measurementData = sqlContext.read.parquet("/workspace/midgar/prod/ws/measurements/base") | |
val mData = measurementData.filter($"dateday" > "2016-05-31" && $"dateday" < "2016-06-10") | |
val mDataSubset = mData.select("customer_id", "channel", "group", "impression", "view", "click", "path_event_type", "dateday") | |
val salesOrderData = sqlContext.table("marketplace.sales_order_snapshot")//.filter($"created_at" < "2016-04-01" || $"created_at" > "2016-03-01") | |
val salesOrderData2 = salesOrderData.filter(!$"title".contains("Recharge")).filter(!$"title".contains("Bill Payment")) | |
val salesOrderGMVAgg = salesOrderData2.groupBy($"customer_id").sum("grandtotal").toDF("customer_id", "sum_gmv") | |
val activeSalesOrderGMVAggSample = salesOrderGMVAgg.filter($"sum_gmv" > 0).sample(false, 0.07) | |
// Join these ACTIVE customers with number of transactions | |
val salesOrderNumTxnAgg = salesOrderData.groupBy($"customer_id").count().toDF("customer_id", "txn_count") | |
val featureSet = activeSalesOrderGMVAggSample.join(salesOrderNumTxnAgg, "customer_id") | |
//Spark Aggregation Step | |
val mDataAggregate = mDataSubset.groupBy("customer_id", "group", "channel", "path_event_type").agg( | |
"impression" -> "sum", | |
"view" -> "sum", | |
"click" -> "sum" | |
) | |
val mFeatureAttSet = featureSet.join(mDataAggregate, "customer_id") | |
mFeatureAttSet.write.format("com.databricks.spark.csv").option("header", "false").save("mFeatureAttSet" + num + ".csv") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment