Skip to content

Instantly share code, notes, and snippets.

@larkz
Created June 13, 2016 02:45
Show Gist options
  • Save larkz/3f4924296b992e6e1873f9f713d7331e to your computer and use it in GitHub Desktop.
Save larkz/3f4924296b992e6e1873f9f713d7331e to your computer and use it in GitHub Desktop.
Bolton Measurement Summary
val num = "27"
val measurementData = sqlContext.read.parquet("/workspace/midgar/prod/ws/measurements/base")
val mData = measurementData.filter($"dateday" > "2016-05-31" && $"dateday" < "2016-06-10")
val mDataSubset = mData.select("customer_id", "channel", "group", "impression", "view", "click", "path_event_type", "dateday")
val salesOrderData = sqlContext.table("marketplace.sales_order_snapshot")//.filter($"created_at" < "2016-04-01" || $"created_at" > "2016-03-01")
val salesOrderData2 = salesOrderData.filter(!$"title".contains("Recharge")).filter(!$"title".contains("Bill Payment"))
val salesOrderGMVAgg = salesOrderData2.groupBy($"customer_id").sum("grandtotal").toDF("customer_id", "sum_gmv")
val activeSalesOrderGMVAggSample = salesOrderGMVAgg.filter($"sum_gmv" > 0).sample(false, 0.07)
// Join these ACTIVE customers with number of transactions
val salesOrderNumTxnAgg = salesOrderData.groupBy($"customer_id").count().toDF("customer_id", "txn_count")
val featureSet = activeSalesOrderGMVAggSample.join(salesOrderNumTxnAgg, "customer_id")
//Spark Aggregation Step
val mDataAggregate = mDataSubset.groupBy("customer_id", "group", "channel", "path_event_type").agg(
"impression" -> "sum",
"view" -> "sum",
"click" -> "sum"
)
val mFeatureAttSet = featureSet.join(mDataAggregate, "customer_id")
mFeatureAttSet.write.format("com.databricks.spark.csv").option("header", "false").save("mFeatureAttSet" + num + ".csv")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment