Skip to content

Instantly share code, notes, and snippets.

@natewave
Created September 14, 2015 09:59
Show Gist options
  • Save natewave/004bf19204d122eafa04 to your computer and use it in GitHub Desktop.
Save natewave/004bf19204d122eafa04 to your computer and use it in GitHub Desktop.
class KafkaCustomDecoder[T](implicit keyRule: RuleLike[AvroValue, T]) extends kafka.serializer.Decoder[VA[T]] with Logging {
import kafka.message.Message
import java.util.Properties
def fromAvro(value: GenericRecord) =
Avro.fromAvro[T](Avro.wrap(value))
def fromBytes(bytes: Array[Byte]): VA[T] = {
val props: Properties = new Properties()
props.put("schema.registry.url", "http://localhost:8081")
props.put("key.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer")
props.put("value.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer")
val vProps: kafka.utils.VerifiableProperties = new kafka.utils.VerifiableProperties(props);
val avroDecoder = new KafkaAvroDecoder(vProps)
val genericRecord: GenericRecord = avroDecoder.fromBytes(bytes).asInstanceOf[GenericRecord]
val decoded: VA[T] = Avro.fromAvro[T](Avro.wrap(genericRecord))
decoded
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment