Skip to content

Instantly share code, notes, and snippets.

@rjsen
Created June 4, 2015 07:19
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save rjsen/eb9f8c165fcec293d469 to your computer and use it in GitHub Desktop.
Save rjsen/eb9f8c165fcec293d469 to your computer and use it in GitHub Desktop.
Example Streaming Application
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.dstream.DStream._
import org.joda.time.DateTime
import org.json4s.jackson.JsonMethods._
import scala.util.Try
case class Purchase(item_id: String, amount: BigDecimal, time: Long)
case class Key(item_id: String, time: DateTime)
case class Summary(item_id: String, time: DateTime, total: BigDecimal)
object Example {
implicit val formats = org.json4s.DefaultJsonFormats
def extract(message: String): Option[(Key, BigDecimal)] = {
for {
parsed <- Try(parse(message)).toOption
purchase <- parsed.extractOpt[Purchase]
} yield {
val datetime = new DateTime(purchase.time)
val roundedTime = datetime.withMinuteOfHour(0).withSecondOfMinute(0).withMillisOfSecond(0)
Key(purchase.item_id, roundedTime) -> purchase.amount
}
}
def transformStream(stream: DStream[String]): DStream[Summary] = {
stream
.flatMap(extract)
.reduceByKey(_ + _)
.map { case (key, amount) =>
Summary(key.item_id, key.time, amount)
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment