Skip to content

Instantly share code, notes, and snippets.

@visualskyrim
Created January 23, 2017 03:34
Show Gist options
  • Save visualskyrim/97c6e1aebec7073bae46b226c6c6d284 to your computer and use it in GitHub Desktop.
Save visualskyrim/97c6e1aebec7073bae46b226c6c6d284 to your computer and use it in GitHub Desktop.
/**
* Created by visualskyrim on 17/01/20.
*/
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
object AQuickExample extends App {
val conf = new SparkConf().setMaster("local[2]").setAppName("AQuickExample")
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> "localhost:9092",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "spark-playground-group",
"auto.offset.reset" -> "latest",
"enable.auto.commit" -> (false: java.lang.Boolean)
)
val ssc = new StreamingContext(conf, Seconds(1))
val inputStream = KafkaUtils.createDirectStream(ssc, PreferConsistent, Subscribe[String, String](Array("test"), kafkaParams))
val processedStream = inputStream
.flatMap(record => record.value.split(" "))
.map(x => (x, 1))
.reduceByKey((x, y) => x + y)
processedStream.print()
ssc.start()
ssc.awaitTermination()
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment