Skip to content

Instantly share code, notes, and snippets.

@afranzi
Created May 30, 2019 14:47
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save afranzi/44823a7b76bdf84b58bf138d5a10a0ee to your computer and use it in GitHub Desktop.
Save afranzi/44823a7b76bdf84b58bf138d5a10a0ee to your computer and use it in GitHub Desktop.
Spark Job Pattern
class ActivityInsightsJob(activityReader: SparkReader,
analyticsReader: SparkReader,
insightsWriter: SparkWriter
)(implicit val sparkSession: SparkSession) extends SparkTask {
def run(): Unit = {
val metricsDF = analyticsReader.read(Some(AnalyticsSchema))
.transform(Events.isActivityImpression)
.transform(Events.isActivityView)
.transform(Events.isBookmarked)
.transform(withCountersBy("activityId"))
.transform(withRatingScore)
val activityInsightsDF = activityReader.read(Some(ActivitySchema))
.transform(cleanActivity)
.transform(enrichActivityWithMetrics(metricsDF))
.transform(withHashId("activityId"))
.transform(withNow)
insightsWriter.write(activityInsightsDF, mode = Append)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment