Skip to content

Instantly share code, notes, and snippets.

@jpzk
Created July 11, 2019 06:22
Show Gist options
  • Save jpzk/0c74cb3686bbd050641ac873b32621aa to your computer and use it in GitHub Desktop.
Save jpzk/0c74cb3686bbd050641ac873b32621aa to your computer and use it in GitHub Desktop.
transactional producer kafka
package au.com.simplesteph.kafka.kafka0_11.demo
import java.util.Properties
import org.apache.kafka.clients.producer.{KafkaProducer, Producer, ProducerConfig, ProducerRecord}
import org.apache.kafka.common.KafkaException
import org.apache.kafka.common.errors.AuthorizationException
import org.apache.kafka.common.errors.OutOfOrderSequenceException
import org.apache.kafka.common.errors.ProducerFencedException
import org.apache.kafka.common.serialization.StringSerializer
object TransactionalProducer {
def main(args: Array[String]): Unit = {
val props = new Properties()
props.put("bootstrap.servers", "localhost:9092")
props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "my-transactional-id") // this has to be set!!! (unique for each producer you're having)
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true") // has to be idempotent
val producer = new KafkaProducer[String ,String](props, new StringSerializer, new StringSerializer)
producer.initTransactions()
try {
producer.beginTransaction()
for (i <- Range(0, 100)) {
producer.send(new ProducerRecord[String, String]("my-transactional-topic", Integer.toString(i), Integer.toString(i)))
producer.send(new ProducerRecord[String, String]("my-other-topic", Integer.toString(i), Integer.toString(i)))
}
producer.commitTransaction()
} catch {
case e@(_: ProducerFencedException | _: OutOfOrderSequenceException | _: AuthorizationException) =>
// We can't recover from these exceptions, so our only option is to close the producer and exit.
producer.close()
case e: KafkaException =>
// For all other exceptions, just abort the transaction and try again.
producer.abortTransaction()
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment