Skip to content

Instantly share code, notes, and snippets.

@BrianLondon
Created December 23, 2015 14:15
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save BrianLondon/9cdcdb958f8c1c7f9b9a to your computer and use it in GitHub Desktop.
Save BrianLondon/9cdcdb958f8c1c7f9b9a to your computer and use it in GitHub Desktop.
// Returns Some((user_id, email_content)) if an email should be
// triggered None otherwise
def eventTriggeredEmail(userId: Int, eventDescription: String):
Option[(Int, String)] = ???
object Emailer extends App with MyConfigurationMixin {
val sparkConf = new SparkConf()
.setAppName("Emailer")
.setMaster("local[4]")
val ssc = new StreamingContext(sparkConf, Seconds(1))
// Tuple is user_id, email
val userRecords: JdbcRDD[(Int, String)] = GetUsersFromDatabase()
// Tuple is user_id, event json
val events: DStream[(Int, String)] = ConnectToExternalEventStream()
val emails: DStream[(Int, String)] = events
.flatMap(eventTriggeredEmail.tupled)
emails
.transform(rdd => rdd.join(userRecords))
.foreach { case (userId, (eventBody, emailAddress)) =>
sendEmail(emailAddress, emailBody)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment