Skip to content

Instantly share code, notes, and snippets.

@CaryBourgeois
Created November 3, 2015 02:55
Show Gist options
  • Save CaryBourgeois/2ea835dfb2afbca7749f to your computer and use it in GitHub Desktop.
Save CaryBourgeois/2ea835dfb2afbca7749f to your computer and use it in GitHub Desktop.
Moving Average Example with Spark Kafka Direct API
package com.datastax.demo
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/**
* Created by carybourgeois on 10/30/15.
*/
import com.datastax.spark.connector._
import com.datastax.spark.connector.SomeColumns
import com.datastax.spark.connector.cql._
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.{SQLContext, SaveMode}
import org.apache.spark.streaming.{Milliseconds, StreamingContext, Time}
import org.apache.spark.streaming.kafka.KafkaUtils
import kafka.serializer.StringDecoder
import org.apache.spark.rdd.RDD
import java.sql.Timestamp
import org.joda.time.{DateTimeZone, DateTime}
case class SensorEvent(edgeId: String, sensorId: String, epochHr: String, ts: Timestamp, depth: Double, value: Double)
// This implementation uses the Kafka Direct API supported in Spark 1.4
object SparkKafkaConsumer extends App {
val checkpoint_path = "SparkKafkaConsumer"
val conf = new SparkConf()
val sc = SparkContext.getOrCreate(conf)
println(s"Creating new SparkContext ${sc.getConf.getAppId}")
val sqlContext = SQLContext.getOrCreate(sc)
import sqlContext.implicits._
val ssc = new StreamingContext(sc, Milliseconds(1000))
ssc.checkpoint(checkpoint_path)
println(s"Creating new StreamingContext ${ssc} with checkpoint path of: $checkpoint_path")
val kafkaTopics = Set("stream_ts")
val kafkaParams = Map[String, String]("metadata.broker.list" -> "localhost:9092")
val kafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, kafkaTopics)
kafkaStream.foreachRDD {
(message: RDD[(String, String)], batchTime: Time) => {
val df = message.map {
case (k, v) => v.split(";")
}.map(payload => {
val ts = Timestamp.valueOf(payload(3))
SensorEvent(payload(0), payload(1), payload(2), ts, payload(4).toDouble, payload(5).toDouble)
}).toDF("edge_id", "sensor", "epoch_hr", "ts", "depth", "value")
df.show()
df.write.format("org.apache.spark.sql.cassandra")
.mode(SaveMode.Append)
.options(Map("keyspace" -> "demo", "table" -> "data"))
.save()
println(s"${df.count()} rows processed.")
}
}
kafkaStream
.countByWindow(Milliseconds(5000), Milliseconds(5000))
.foreachRDD {
(message: RDD[(Long)]) => {
val ts = new DateTime()
val df = message.map(e => (1, new Timestamp((System.currentTimeMillis()/1000)*1000), e))
.toDF("pk", "ts", "cnt")
df.show()
df.write.format("org.apache.spark.sql.cassandra")
.mode(SaveMode.Append)
.options(Map("keyspace" -> "demo", "table" -> "count"))
.save()
}
}
kafkaStream
.countByWindow(Milliseconds(15000), Milliseconds(5000))
.foreachRDD {
(message: RDD[(Long)]) => {
val ts = new DateTime()
val df = message.map(e => (1, new Timestamp((System.currentTimeMillis()/1000)*1000), e/3))
.toDF("pk", "ts", "cnt_ma_3")
df.show()
df.write.format("org.apache.spark.sql.cassandra")
.mode(SaveMode.Append)
.options(Map("keyspace" -> "demo", "table" -> "count"))
.save()
}
}
ssc.start()
ssc.awaitTermination()
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment