-
-
Save chidambaram005/719e76c46b86f26c5ca11d76203b43f8 to your computer and use it in GitHub Desktop.
Scala code for custom Partitioner
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
--We can solve our requirement, and any other type of partitioning needs by implementing a custom partitioner. | |
--The below is the sample code for Producer | |
package com.knoldus | |
import java.util.Properties | |
import org.apache.kafka.clients.producer._ | |
object KafkaProducer extends App { | |
val props = new Properties() | |
val topicName = "ATM" | |
props.put("bootstrap.servers", "localhost:9092,localhost:9093") | |
props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer") | |
props.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer") | |
props.put("partitioner.class", "com.knoldus.CustomPartitioner") | |
val producer = new KafkaProducer[String, String](props) | |
try { | |
for (i <- 0 to 5) { | |
val record = new ProducerRecord[String, String](topicName,"Credit" + i,"My Site is knoldus.com " + i) | |
producer.send(record) | |
} | |
for (i <- 0 to 5) { | |
val record = new ProducerRecord[String, String](topicName,"Debit" + i,"My Site is knoldus.com " + i) | |
producer.send(record) | |
} | |
} catch { | |
case e: Exception => e.printStackTrace() | |
} finally { | |
producer.close() | |
} | |
} | |
--We need to create our class by implementing the Partitioner Interface.There are three methods to implement the partitioners as given | |
--below | |
--1.Configure. | |
--2.Partition. | |
--3.Close. | |
--Sample scala code is given below to implement this | |
package com.knoldus | |
import java.util | |
import org.apache.kafka.common.record.InvalidRecordException | |
import org.apache.kafka.common.utils.Utils | |
import org.apache.kafka.clients.producer.Partitioner | |
import org.apache.kafka.common.Cluster | |
class CustomPartitioner extends Partitioner { | |
val Transactiontype = "Credit" | |
override def configure(configs: util.Map[String, _]): Unit = {} | |
override def partition(topic: String,key: Any, keyBytes: Array[Byte], value: Any,valueBytes: Array[Byte],cluster: Cluster): Int = { | |
val partitions = cluster.partitionsForTopic(topic) | |
val numPartitions = partitions.size | |
val Credit = Math.abs(numPartitions * 0.4).asInstanceOf[Int] | |
if ((keyBytes == null) || (!key.isInstanceOf[String])) | |
throw new InvalidRecordException("All transaction must have Transaction type") | |
if (key.asInstanceOf[String].startsWith(TransactionType)) { | |
val p = Utils.toPositive(Utils.murmur2(keyBytes)) % credit | |
p | |
} else { | |
val p = Utils.toPositive(Utils.murmur2(keyBytes)) % (numPartitions - Credit) + Credit | |
p | |
} | |
} | |
override def close(): Unit = {} | |
} | |
--The below is the sample code for Consumer | |
package com.knoldus | |
import java.util | |
import java.util.Properties | |
import scala.jdk.CollectionConverters._ | |
import org.apache.kafka.clients.consumer.KafkaConsumer | |
object KafkaConsumer extends App { | |
val props: Properties = new Properties() | |
val topicName = "ATM" | |
props.put("group.id", "test") | |
props.put("bootstrap.servers", "localhost:9092,localhost:9093") | |
props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer") | |
props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer") | |
val consumer = new KafkaConsumer(props) | |
try { | |
consumer.subscribe(util.Arrays.asList(topicName)) | |
while (true) { | |
val records = consumer.poll(10) | |
for (record <- records.asScala) { | |
println("Topic: " + record.topic() + ", Offset: " + record.offset() +", Partition: " + record.partition()) | |
} | |
} | |
} catch { | |
case e: Exception => e.printStackTrace() | |
} finally { | |
consumer.close() | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment