Skip to content

Instantly share code, notes, and snippets.

@chidambaram005
Last active March 12, 2024 19:24
Show Gist options
  • Save chidambaram005/719e76c46b86f26c5ca11d76203b43f8 to your computer and use it in GitHub Desktop.
Save chidambaram005/719e76c46b86f26c5ca11d76203b43f8 to your computer and use it in GitHub Desktop.
Scala code for custom Partitioner
--We can solve our requirement, and any other type of partitioning needs by implementing a custom partitioner.
--The below is the sample code for Producer
package com.knoldus
import java.util.Properties
import org.apache.kafka.clients.producer._
object KafkaProducer extends App {
val props = new Properties()
val topicName = "ATM"
props.put("bootstrap.servers", "localhost:9092,localhost:9093")
props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer")
props.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer")
props.put("partitioner.class", "com.knoldus.CustomPartitioner")
val producer = new KafkaProducer[String, String](props)
try {
for (i <- 0 to 5) {
val record = new ProducerRecord[String, String](topicName,"Credit" + i,"My Site is knoldus.com " + i)
producer.send(record)
}
for (i <- 0 to 5) {
val record = new ProducerRecord[String, String](topicName,"Debit" + i,"My Site is knoldus.com " + i)
producer.send(record)
}
} catch {
case e: Exception => e.printStackTrace()
} finally {
producer.close()
}
}
--We need to create our class by implementing the Partitioner Interface.There are three methods to implement the partitioners as given
--below
--1.Configure.
--2.Partition.
--3.Close.
--Sample scala code is given below to implement this
package com.knoldus
import java.util
import org.apache.kafka.common.record.InvalidRecordException
import org.apache.kafka.common.utils.Utils
import org.apache.kafka.clients.producer.Partitioner
import org.apache.kafka.common.Cluster
class CustomPartitioner extends Partitioner {
val Transactiontype = "Credit"
override def configure(configs: util.Map[String, _]): Unit = {}
override def partition(topic: String,key: Any, keyBytes: Array[Byte], value: Any,valueBytes: Array[Byte],cluster: Cluster): Int = {
val partitions = cluster.partitionsForTopic(topic)
val numPartitions = partitions.size
val Credit = Math.abs(numPartitions * 0.4).asInstanceOf[Int]
if ((keyBytes == null) || (!key.isInstanceOf[String]))
throw new InvalidRecordException("All transaction must have Transaction type")
if (key.asInstanceOf[String].startsWith(TransactionType)) {
val p = Utils.toPositive(Utils.murmur2(keyBytes)) % credit
p
} else {
val p = Utils.toPositive(Utils.murmur2(keyBytes)) % (numPartitions - Credit) + Credit
p
}
}
override def close(): Unit = {}
}
--The below is the sample code for Consumer
package com.knoldus
import java.util
import java.util.Properties
import scala.jdk.CollectionConverters._
import org.apache.kafka.clients.consumer.KafkaConsumer
object KafkaConsumer extends App {
val props: Properties = new Properties()
val topicName = "ATM"
props.put("group.id", "test")
props.put("bootstrap.servers", "localhost:9092,localhost:9093")
props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer")
props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer")
val consumer = new KafkaConsumer(props)
try {
consumer.subscribe(util.Arrays.asList(topicName))
while (true) {
val records = consumer.poll(10)
for (record <- records.asScala) {
println("Topic: " + record.topic() + ", Offset: " + record.offset() +", Partition: " + record.partition())
}
}
} catch {
case e: Exception => e.printStackTrace()
} finally {
consumer.close()
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment