Skip to content

Instantly share code, notes, and snippets.

@retroryan
Created November 2, 2015 17:54
Show Gist options
  • Save retroryan/367e177156f9561af473 to your computer and use it in GitHub Desktop.
Save retroryan/367e177156f9561af473 to your computer and use it in GitHub Desktop.
spark streaming windowing example
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.Time
import kafka.serializer.StringDecoder
import org.joda.time.DateTime
import org.apache.spark.sql.SaveMode
import sqlContext.implicits._
val ratingsStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics)
val msgs = ratingsStream.transform {
(message: RDD[(String, String)], batchTime: Time) => {
// convert each RDD from the batch into a Ratings DataFrame
//rating data has the format user_id:movie_id:rating:timestamp
val ratingsDF = message.map {
case (key, nxtRating) => nxtRating.split("::")
}.map(rating => {
val timestamp: Long = new DateTime(rating(3).trim.toLong).getMillis
Rating(rating(0).trim.toInt, rating(1).trim.toInt, rating(2).trim.toFloat, timestamp)
} ).toDF("user_id", "movie_id", "rating", "timestamp")
// this can be used to debug dataframes
//if (debugOutput)
// df.show()
// save the DataFrame to Cassandra
// Note: Cassandra has been initialized through spark-env.sh
// Specifically, export SPARK_JAVA_OPTS=-Dspark.cassandra.connection.host=127.0.0.1
ratingsDF.write.format("org.apache.spark.sql.cassandra")
.mode(SaveMode.Append)
.options(Map("keyspace" -> "movie_db", "table" -> "rating_by_movie"))
.save()
//list.appendAll(message.take(10).toList.map(_.toString))
//val data = new collection.mutable.ArrayBuffer[(Long, Double)]()
val dfWithBatchTime = ratingsDF.withColumn("batchtime", org.apache.spark.sql.functions.lit(batchTime.milliseconds))
dfWithBatchTime.rdd
}
}
//show the first ten of each RDD
msgs.foreachRDD(rdd => list.appendAll(rdd.take(10).toList.map(_.toString)))
// show the moving average of 10s in the plot
val data = new collection.mutable.ArrayBuffer[(Long, Double)]()
val w = msgs.window(Seconds(10), Seconds(2)).foreachRDD{(rdd, t) =>
val ndata = Seq((t.milliseconds, rdd.map(_.getAs[Float]("rating")).mean.toDouble))
org.apache.log4j.Logger.getLogger("lines").info("# " + ndata.size)
line.addAndApply(data ++= ndata)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment