Skip to content

Instantly share code, notes, and snippets.

@geomagilles
Last active December 14, 2020 21:35
Show Gist options
  • Save geomagilles/7415047bd224102334f56da88bce3d2b to your computer and use it in GitHub Desktop.
Save geomagilles/7415047bd224102334f56da88bce3d2b to your computer and use it in GitHub Desktop.
Building Pulsar Producer<T> and Consumer<T> in kotlin using native serialization
import com.github.avrokotlin.avro4k.Avro
import com.github.avrokotlin.avro4k.io.AvroEncodeFormat
import io.infinitic.common.tasks.executors.messages.RunTask
import kotlinx.serialization.KSerializer
import org.apache.avro.file.SeekableByteArrayInput
import org.apache.avro.generic.GenericDatumReader
import org.apache.avro.generic.GenericRecord
import org.apache.avro.io.DecoderFactory
import org.apache.pulsar.client.api.Consumer
import org.apache.pulsar.client.api.Producer
import org.apache.pulsar.client.api.PulsarClient
import org.apache.pulsar.client.api.Schema
import org.apache.pulsar.client.api.schema.SchemaDefinition
import org.apache.pulsar.client.api.schema.SchemaReader
import org.apache.pulsar.client.api.schema.SchemaWriter
import java.io.ByteArrayOutputStream
import java.io.InputStream
// Convert T instance to Avro schemaless binary format
fun <T : Any> writeBinary(t: T, serializer: KSerializer<T>): ByteArray {
val out = ByteArrayOutputStream()
Avro.default.openOutputStream(serializer) {
encodeFormat = AvroEncodeFormat.Binary
schema = Avro.default.schema(serializer)
}.to(out).write(t).close()
return out.toByteArray()
}
// Convert Avro schemaless byte array to T instance
fun <T> readBinary(bytes: ByteArray, serializer: KSerializer<T>): T {
val datumReader = GenericDatumReader<GenericRecord>(Avro.default.schema(serializer))
val decoder = DecoderFactory.get().binaryDecoder(SeekableByteArrayInput(bytes), null)
return Avro.default.fromRecord(serializer, datumReader.read(null, decoder))
}
// custom Pulsar SchemaReader
class RunTaskSchemaReader: SchemaReader<RunTask> {
override fun read(bytes: ByteArray, offset: Int, length: Int) =
read(bytes.inputStream(offset, length))
override fun read(inputStream: InputStream) =
readBinary(inputStream.readBytes(), RunTask.serializer())
}
// custom Pulsar SchemaWriter
class RunTaskSchemaWriter : SchemaWriter<RunTask> {
override fun write(message: RunTask) = writeBinary(message, RunTask.serializer())
}
// custom Pulsar SchemaDefinition<RunTask>
fun runTaskSchemaDefinition(): SchemaDefinition<RunTask> =
SchemaDefinition.builder<RunTask>()
.withJsonDef(Avro.default.schema(RunTask.serializer()).toString())
.withSchemaReader(RunTaskSchemaReader())
.withSchemaWriter(RunTaskSchemaWriter())
.withSupportSchemaVersioning(true)
.build()
// Create an instance of Producer<RunTask>
fun runTaskProducer(client: PulsarClient): Producer<RunTask> = client
.newProducer(Schema.AVRO(runTaskSchemaDefinition()))
.topic("some-avro-topic")
.create();
// Create an instance of Consumer<RunTask>
fun runTaskConsumer(client: PulsarClient): Consumer<RunTask> = client
.newConsumer(Schema.AVRO(runTaskSchemaDefinition()))
.topic("some-avro-topic")
.subscribe();
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment