Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
import java.util.List;
import java.util.Map;
import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.PartitionInfo;
public class TestPartitioner implements Partitioner{
@Override
public void configure(Map<String, ?> arg0) {
// Do nothing
}
@Override
public void close() {
// Do nothing
}
@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
int numPartitions = partitions.size();
String strKey = (String)key;
String partitionToken = strKey.substring(0, strKey.indexOf('.')); // dept1 ~ dept5
return partitionToken.hashCode() % numPartitions;
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment