Skip to content

Instantly share code, notes, and snippets.

@sfalquier
Last active November 11, 2020 15:50
Show Gist options
  • Save sfalquier/4c0c7f36dd96d642b416 to your computer and use it in GitHub Desktop.
Save sfalquier/4c0c7f36dd96d642b416 to your computer and use it in GitHub Desktop.
Kafka Custom Partitioner
import kafka.producer._
import kafka.utils._
import java.util._
import java.text._
import java.util.concurrent.atomic._
class KafkaPartitioner(props: VerifiableProperties = null) extends Partitioner {
val counter = new AtomicInteger(0)
val batch = new AtomicInteger(0)
val partition = new AtomicInteger(0)
def partition(key: Any, numPartitions: Int): Int = {
//round robin partitioner to smooth producers' traffic on all partitions
//change partition every X messages where X corresponds to kafka producer batch message size
if(batch.incrementAndGet % BATCH_NUM_MESSAGES == 1) {
partition.set(math.abs(counter.incrementAndGet) % numPartitions)
batch.set(0)
}
partition.get
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment