Skip to content

Instantly share code, notes, and snippets.

Created November 2, 2015 17:54
Show Gist options
  • Save retroryan/367e177156f9561af473 to your computer and use it in GitHub Desktop.
Save retroryan/367e177156f9561af473 to your computer and use it in GitHub Desktop.
spark streaming windowing example
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.Time
import kafka.serializer.StringDecoder
import org.joda.time.DateTime
import org.apache.spark.sql.SaveMode
import sqlContext.implicits._
val ratingsStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics)
val msgs = ratingsStream.transform {
(message: RDD[(String, String)], batchTime: Time) => {
// convert each RDD from the batch into a Ratings DataFrame
//rating data has the format user_id:movie_id:rating:timestamp
val ratingsDF = {
case (key, nxtRating) => nxtRating.split("::")
}.map(rating => {
val timestamp: Long = new DateTime(rating(3).trim.toLong).getMillis
Rating(rating(0).trim.toInt, rating(1).trim.toInt, rating(2).trim.toFloat, timestamp)
} ).toDF("user_id", "movie_id", "rating", "timestamp")
// this can be used to debug dataframes
//if (debugOutput)
// save the DataFrame to Cassandra
// Note: Cassandra has been initialized through
// Specifically, export
.options(Map("keyspace" -> "movie_db", "table" -> "rating_by_movie"))
//val data = new collection.mutable.ArrayBuffer[(Long, Double)]()
val dfWithBatchTime = ratingsDF.withColumn("batchtime", org.apache.spark.sql.functions.lit(batchTime.milliseconds))
//show the first ten of each RDD
msgs.foreachRDD(rdd => list.appendAll(rdd.take(10)
// show the moving average of 10s in the plot
val data = new collection.mutable.ArrayBuffer[(Long, Double)]()
val w = msgs.window(Seconds(10), Seconds(2)).foreachRDD{(rdd, t) =>
val ndata = Seq((t.milliseconds,[Float]("rating")).mean.toDouble))
org.apache.log4j.Logger.getLogger("lines").info("# " + ndata.size)
line.addAndApply(data ++= ndata)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment