Skip to content

Instantly share code, notes, and snippets.

@yuta-imai
Last active January 26, 2017 23:11
Show Gist options
  • Save yuta-imai/3880494dde8e2ab7b3fad753f923e59c to your computer and use it in GitHub Desktop.
Save yuta-imai/3880494dde8e2ab7b3fad753f923e59c to your computer and use it in GitHub Desktop.
package imaifactory.examples
import org.apache.spark.streaming._
import org.apache.spark.streaming.kinesis._
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.apache.spark.storage.StorageLevel
object JsonStreamConsumerExample extends App {
val appName = "APP_NAME"
val streamName = "STREAM_NAME"
val endpointUrl = "https://kinesis.REGION.amazonaws.com"
val region = "REGION"
val position = InitialPositionInStream.LATEST
val conf = new SparkConf().setAppName(appName).setMaster("local[*]")
val ssc = new StreamingContext(conf, Seconds(10))
val spark = SparkSession.builder.config(conf).getOrCreate()
val kinesisStream = KinesisUtils.createStream(
ssc, appName, streamName, endpointUrl,
region, position, Seconds(10), StorageLevel.MEMORY_AND_DISK_2)
val dataStream = kinesisStream.foreachRDD(rdd => {
val df = spark.read.json(rdd.map(record => { new String(record)}))
if(df.count() > 0) {
df.createOrReplaceTempView("bx1")
spark.sql("select * from bx1").show
}
})
ssc.start()
ssc.awaitTermination()
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment