Skip to content

Instantly share code, notes, and snippets.

@qi-qi
Last active April 29, 2019 14:23
Show Gist options
  • Save qi-qi/27b9bde5e3f6c6a28b3790f0a712d565 to your computer and use it in GitHub Desktop.
Save qi-qi/27b9bde5e3f6c6a28b3790f0a712d565 to your computer and use it in GitHub Desktop.
test("hello") {
val dw = spark.read.option("header", "true").csv("/Users/qi/Desktop/old_dw.csv")
val iab = spark.read.option("header", "true").csv("/Users/qi/Desktop/iab.csv")
val podindex = spark.read.option("header", "true").csv("/Users/qi/Desktop/podindex.csv")
val legacyindex = spark.read.option("header", "true").csv("/Users/qi/Desktop/legacyindex.csv")
val realtimeindex = spark.read.option("header", "true").csv("/Users/qi/Desktop/realtimeindex.csv")
val legacyDiff = (($"legacyindex_counting" - $"old_dw_counting") / $"old_dw_counting").as("legacyindex_diff")
val podindexDiff = (($"podindex_counting" - $"old_dw_counting") / $"old_dw_counting").as("podindex_diff")
val iabDiff = (($"iab_counting" - $"old_dw_counting") / $"old_dw_counting").as("iab_diff")
val realtimeindexDiff = (($"realtimeindex_counting" - $"old_dw_counting") / $"old_dw_counting").as("realtimeindex_diff")
dw.join(legacyindex, Seq("dt", "show_id"), "leftouter")
.join(podindex, Seq("dt", "show_id"), "leftouter")
.join(iab, Seq("dt", "show_id"), "leftouter")
.join(realtimeindex, Seq("dt", "show_id"), "leftouter")
.select($"*", legacyDiff, podindexDiff, iabDiff, realtimeindexDiff)
.filter(month($"dt")===3)
// .show(100, false)
.repartition(1)
.write.mode("overwrite")
.option("header","true")
.csv("/Users/qi/Desktop/march")
}
test("hello2") {
val dw = spark.read.option("header", "true").csv("/Users/qi/Desktop/listen_old_dw_month_country_20190204-20190412.csv")
val index = spark.read.option("header", "true").csv("/Users/qi/Desktop/listen_index_month_country_20190204-20190412.csv")
val legacyDiff = (($"legacyindex_count" - $"old_dw_count") / $"old_dw_count").as("legacyindex_diff")
val podindexDiff = (($"podindex_count" - $"old_dw_count") / $"old_dw_count").as("podindex_diff")
val iabDiff = (($"iab_count" - $"old_dw_count") / $"old_dw_count").as("iab_diff")
val realtimeindexDiff = (($"realtimeindex_count" - $"old_dw_count") / $"old_dw_count").as("realtimeindex_diff")
dw.join(index, Seq("month","show_id","show_url","geo_country_iso"), "leftouter")
.select($"*", legacyDiff, iabDiff, podindexDiff, realtimeindexDiff)
// .filter($"month"===4)
// .show(100, false)
.repartition(1)
.write.mode("overwrite")
.option("header","true")
.csv("/Users/qi/Desktop/multi-index-month-country/")
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment