Created
June 22, 2017 10:34
-
-
Save Rohithyeravothula/373b76ee58f7499c960cf39c67518063 to your computer and use it in GitHub Desktop.
custom kafka deserialization for consumer
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
object Test extends App with SparkEngine { | |
import org.apache.kafka.common.serialization.StringDeserializer | |
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe | |
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent | |
import org.apache.spark.streaming.kafka010._ | |
case class Car(id: Int, name: String) | |
class CarDeserialization extends Deserializer[Car] { | |
override def configure(configs: util.Map[String, _], isKey: Boolean): Unit = {} | |
override def close(): Unit = {} | |
override def deserialize(topic: String, raw: Array[Byte]): Car = { | |
val data = new java.lang.String(raw, "UTF8") | |
println(s"data is $data") | |
data.map(x => x.toString) | |
val items = data.toString.split(",") | |
if (items.length < 2){ | |
Car(0, "nothing") | |
} | |
else{ | |
Car(items(0).length, items(1)) | |
} | |
} | |
} | |
val kafkaParams = Map[String, Object]( | |
"bootstrap.servers" -> "localhost:9092", | |
"key.deserializer" -> classOf[StringDeserializer], | |
"value.deserializer" -> classOf[CarDeserialization], | |
"auto.offset.reset" -> "latest", | |
"group.id" -> "local", | |
"enable.auto.commit" -> (false: java.lang.Boolean) | |
) | |
val topics = Array("black") | |
val stream = KafkaUtils.createDirectStream[String, Car](ssc, PreferConsistent, Subscribe[String, Car](topics, kafkaParams)) | |
stream.map(_.value()).foreachRDD{rdd => | |
println(rdd.collect().toList) | |
} | |
ssc.start() | |
ssc.awaitTermination() | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment