Skip to content

Instantly share code, notes, and snippets.

@mageddo
Created March 27, 2019 18:39
Show Gist options
  • Save mageddo/343e3da94be8305f200d7d2111fef6d5 to your computer and use it in GitHub Desktop.
Save mageddo/343e3da94be8305f200d7d2111fef6d5 to your computer and use it in GitHub Desktop.
Kafka Round Robin
static class Partition {
int partition;
int count;
public Partition(int partition) {
this.partition = partition;
this.count = 0;
}
public int getCount() {
return count;
}
@Override
public String toString() {
return String.format("{ p: %d, c: %d }", partition, count);
}
}
public static void main(String[] args) {
final int partitions = 20;
final List<Integer> importedFileIds = List.of(
72374,72373,72372,72371,72368,72369,72370,72367,72366,72365,72363,72364,72362,72361,72360,72359,72358,
72357,72356,72355,72354,72353,72352,72351,72350,72349,72348,72347,72346,72345,72344,72343,72342,72341,72340,
72339,72337,72338,72336,72335,72334,72333,72332,72331,72330,72329,72327,72328,72325,72326,72324,72323,72322,
72319,72320,72321,72318,72317,72316,72315,72314,72313
);
Map<Integer, Partition> mapA = new HashMap<>();
Map<Integer, Partition> mapB = new HashMap<>();
for (final int importedFileId : importedFileIds) {
final String importedFileStr = String.valueOf(importedFileId);
int aPartition = calcPartition(importedFileStr.getBytes(), partitions);
int bPartition = calcPartition(DigestUtils.md2(importedFileStr), partitions);
if(!mapA.containsKey(aPartition)){
mapA.put(aPartition, new Partition(aPartition));
}
if(!mapB.containsKey(bPartition)){
mapB.put(bPartition, new Partition(bPartition));
}
mapA.get(aPartition).count++;
mapB.get(bPartition).count++;
System.out.printf(
"importedFileId=%d, partition=%d, partition=%d %n", importedFileId, aPartition, bPartition
);
}
final List<Partition> aPartitions = new ArrayList<>(mapA.values());
final List<Partition> bPartitions = new ArrayList<>(mapB.values());
Collections.sort(aPartitions, Comparator.comparingInt(Partition::getCount).reversed());
Collections.sort(bPartitions, Comparator.comparingInt(Partition::getCount).reversed());
System.out.println(aPartitions);
System.out.println(bPartitions);
}
private static int calcPartition(byte[] key, int partitions) {
return Utils.toPositive(Utils.murmur2(key)) % partitions;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment