Skip to content

Instantly share code, notes, and snippets.

@Rohithyeravothula
Created June 22, 2017 10:34
Show Gist options
  • Save Rohithyeravothula/373b76ee58f7499c960cf39c67518063 to your computer and use it in GitHub Desktop.
Save Rohithyeravothula/373b76ee58f7499c960cf39c67518063 to your computer and use it in GitHub Desktop.
custom kafka deserialization for consumer
object Test extends App with SparkEngine {
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010._
case class Car(id: Int, name: String)
class CarDeserialization extends Deserializer[Car] {
override def configure(configs: util.Map[String, _], isKey: Boolean): Unit = {}
override def close(): Unit = {}
override def deserialize(topic: String, raw: Array[Byte]): Car = {
val data = new java.lang.String(raw, "UTF8")
println(s"data is $data")
data.map(x => x.toString)
val items = data.toString.split(",")
if (items.length < 2){
Car(0, "nothing")
}
else{
Car(items(0).length, items(1))
}
}
}
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> "localhost:9092",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[CarDeserialization],
"auto.offset.reset" -> "latest",
"group.id" -> "local",
"enable.auto.commit" -> (false: java.lang.Boolean)
)
val topics = Array("black")
val stream = KafkaUtils.createDirectStream[String, Car](ssc, PreferConsistent, Subscribe[String, Car](topics, kafkaParams))
stream.map(_.value()).foreachRDD{rdd =>
println(rdd.collect().toList)
}
ssc.start()
ssc.awaitTermination()
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment