Skip to content

Instantly share code, notes, and snippets.

@kykl
Created June 4, 2021 21:04
Show Gist options
  • Save kykl/58a98bf14bf7d73c228357715c46f56f to your computer and use it in GitHub Desktop.
Save kykl/58a98bf14bf7d73c228357715c46f56f to your computer and use it in GitHub Desktop.
private def klog[T](topic: String, key: String, value: T): Unit = {
val props = new Properties()
import org.apache.kafka.clients.producer.ProducerConfig
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "io.confluent.kafka.serializers.protobuf.KafkaProtobufSerializer")
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999")
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "io.confluent.kafka.serializers.protobuf.KafkaProtobufSerializer")
props.put("schema.registry.url", "http://127.0.0.1:8081")
val producer = new KafkaProducer[String, T](props)
val record = new ProducerRecord[String, T](topic, key, value)
producer.send(record)
producer.close()
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment