Created
December 23, 2015 14:15
-
-
Save BrianLondon/9cdcdb958f8c1c7f9b9a to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// Returns Some((user_id, email_content)) if an email should be | |
// triggered None otherwise | |
def eventTriggeredEmail(userId: Int, eventDescription: String): | |
Option[(Int, String)] = ??? | |
object Emailer extends App with MyConfigurationMixin { | |
val sparkConf = new SparkConf() | |
.setAppName("Emailer") | |
.setMaster("local[4]") | |
val ssc = new StreamingContext(sparkConf, Seconds(1)) | |
// Tuple is user_id, email | |
val userRecords: JdbcRDD[(Int, String)] = GetUsersFromDatabase() | |
// Tuple is user_id, event json | |
val events: DStream[(Int, String)] = ConnectToExternalEventStream() | |
val emails: DStream[(Int, String)] = events | |
.flatMap(eventTriggeredEmail.tupled) | |
emails | |
.transform(rdd => rdd.join(userRecords)) | |
.foreach { case (userId, (eventBody, emailAddress)) => | |
sendEmail(emailAddress, emailBody) | |
} | |
} | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment