Created
November 2, 2015 17:54
-
-
Save retroryan/367e177156f9561af473 to your computer and use it in GitHub Desktop.
spark streaming windowing example
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import org.apache.spark.streaming.kafka.KafkaUtils | |
import org.apache.spark.streaming.Time | |
import kafka.serializer.StringDecoder | |
import org.joda.time.DateTime | |
import org.apache.spark.sql.SaveMode | |
import sqlContext.implicits._ | |
val ratingsStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics) | |
val msgs = ratingsStream.transform { | |
(message: RDD[(String, String)], batchTime: Time) => { | |
// convert each RDD from the batch into a Ratings DataFrame | |
//rating data has the format user_id:movie_id:rating:timestamp | |
val ratingsDF = message.map { | |
case (key, nxtRating) => nxtRating.split("::") | |
}.map(rating => { | |
val timestamp: Long = new DateTime(rating(3).trim.toLong).getMillis | |
Rating(rating(0).trim.toInt, rating(1).trim.toInt, rating(2).trim.toFloat, timestamp) | |
} ).toDF("user_id", "movie_id", "rating", "timestamp") | |
// this can be used to debug dataframes | |
//if (debugOutput) | |
// df.show() | |
// save the DataFrame to Cassandra | |
// Note: Cassandra has been initialized through spark-env.sh | |
// Specifically, export SPARK_JAVA_OPTS=-Dspark.cassandra.connection.host=127.0.0.1 | |
ratingsDF.write.format("org.apache.spark.sql.cassandra") | |
.mode(SaveMode.Append) | |
.options(Map("keyspace" -> "movie_db", "table" -> "rating_by_movie")) | |
.save() | |
//list.appendAll(message.take(10).toList.map(_.toString)) | |
//val data = new collection.mutable.ArrayBuffer[(Long, Double)]() | |
val dfWithBatchTime = ratingsDF.withColumn("batchtime", org.apache.spark.sql.functions.lit(batchTime.milliseconds)) | |
dfWithBatchTime.rdd | |
} | |
} | |
//show the first ten of each RDD | |
msgs.foreachRDD(rdd => list.appendAll(rdd.take(10).toList.map(_.toString))) | |
// show the moving average of 10s in the plot | |
val data = new collection.mutable.ArrayBuffer[(Long, Double)]() | |
val w = msgs.window(Seconds(10), Seconds(2)).foreachRDD{(rdd, t) => | |
val ndata = Seq((t.milliseconds, rdd.map(_.getAs[Float]("rating")).mean.toDouble)) | |
org.apache.log4j.Logger.getLogger("lines").info("# " + ndata.size) | |
line.addAndApply(data ++= ndata) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment