Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
spark streaming windowing example
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.Time
import kafka.serializer.StringDecoder
import org.joda.time.DateTime
import org.apache.spark.sql.SaveMode
import sqlContext.implicits._
val ratingsStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics)
val msgs = ratingsStream.transform {
(message: RDD[(String, String)], batchTime: Time) => {
// convert each RDD from the batch into a Ratings DataFrame
//rating data has the format user_id:movie_id:rating:timestamp
val ratingsDF = message.map {
case (key, nxtRating) => nxtRating.split("::")
}.map(rating => {
val timestamp: Long = new DateTime(rating(3).trim.toLong).getMillis
Rating(rating(0).trim.toInt, rating(1).trim.toInt, rating(2).trim.toFloat, timestamp)
} ).toDF("user_id", "movie_id", "rating", "timestamp")
// this can be used to debug dataframes
//if (debugOutput)
// df.show()
// save the DataFrame to Cassandra
// Note: Cassandra has been initialized through spark-env.sh
// Specifically, export SPARK_JAVA_OPTS=-Dspark.cassandra.connection.host=127.0.0.1
ratingsDF.write.format("org.apache.spark.sql.cassandra")
.mode(SaveMode.Append)
.options(Map("keyspace" -> "movie_db", "table" -> "rating_by_movie"))
.save()
//list.appendAll(message.take(10).toList.map(_.toString))
//val data = new collection.mutable.ArrayBuffer[(Long, Double)]()
val dfWithBatchTime = ratingsDF.withColumn("batchtime", org.apache.spark.sql.functions.lit(batchTime.milliseconds))
dfWithBatchTime.rdd
}
}
//show the first ten of each RDD
msgs.foreachRDD(rdd => list.appendAll(rdd.take(10).toList.map(_.toString)))
// show the moving average of 10s in the plot
val data = new collection.mutable.ArrayBuffer[(Long, Double)]()
val w = msgs.window(Seconds(10), Seconds(2)).foreachRDD{(rdd, t) =>
val ndata = Seq((t.milliseconds, rdd.map(_.getAs[Float]("rating")).mean.toDouble))
org.apache.log4j.Logger.getLogger("lines").info("# " + ndata.size)
line.addAndApply(data ++= ndata)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment