Skip to content

Instantly share code, notes, and snippets.

@hsm207
Last active September 16, 2019 13:27
Show Gist options
  • Save hsm207/a65dffe3ec23ff2e41b6a44bdf11c7ba to your computer and use it in GitHub Desktop.
Save hsm207/a65dffe3ec23ff2e41b6a44bdf11c7ba to your computer and use it in GitHub Desktop.
implementation of getAnalytics
def getAnalytics(bucketName: String, brand: String, bucketYear: String, bucketMonth: String, bucketDay: String, candidate_field: String=candidateField, groups: String=Groups): DataFrame = {
var rankingOrderedIds = Window.partitionBy("c12").orderBy("id")
val s3PathAnalytics = getS3Path(bucketName, brand, bucketFolder, year=bucketYear, month=bucketMonth, day=bucketDay)
readJSON(s3PathAnalytics)
.distinct
.withColumn("x", explode($"payload"))
// a few more calls to withColumn to create columns
.withColumn("c10", explode(when(size(col("x1")) > 0, col("x1")).otherwise(array(lit(null).cast("string")))))
// a few more calls to withColumn to create columns
.withColumn("id", monotonically_increasing_id)
// a few more calls to withColumn to create columns
.withColumn("c12", concat_ws("X", $"x2", $"x3", $"c3"))
.withColumn("c13", rank().over(rankingOrderedIds))
.distinct
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment