Skip to content

Instantly share code, notes, and snippets.

@chadselph
Created February 3, 2023 02:02
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save chadselph/9e6c3d6d85eb158393df7adb41157bf3 to your computer and use it in GitHub Desktop.
Save chadselph/9e6c3d6d85eb158393df7adb41157bf3 to your computer and use it in GitHub Desktop.
Reading data from confluent schema registry without using their library
import org.apache.kafka.clients.consumer.KafkaConsumer
import org.apache.kafka.common.serialization.{Deserializer, StringDeserializer}
import org.apache.kafka.common.utils.ByteUtils
/**
* Not recommended for production but useful when you want to write some kind of
* quick script that reads data off kafka.
*/
object ReadData {
type MyType = ??? // some protobuf type
val myTypeDeserializer: Deserializer[MyType] = { (topic, data) =>
val bb = ByteBuffer.wrap(data)
bb.get // magic number
bb.getInt // version number
val indexes = ByteUtils.readVarint(bb)
for (_ <- Range(0, indexes)) ByteUtils.readVarint(bb)
MyType.parseFrom(bb) // assuming Protobuf here
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment