Skip to content

Instantly share code, notes, and snippets.

@allenxwang
Created August 15, 2017 18:53
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save allenxwang/af2f5b3251973debc4dc7b8144ae951f to your computer and use it in GitHub Desktop.
Save allenxwang/af2f5b3251973debc4dc7b8144ae951f to your computer and use it in GitHub Desktop.
/**
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
* file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
* to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
* License. You may obtain a copy of the License at
* <p/>
* http://www.apache.org/licenses/LICENSE-2.0
* <p/>
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package org.apache.kafka.clients.consumer.internals;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
/**
* Abstract assignor implementation which does some common grunt work (in particular collecting
* partition counts which are always needed in assignors).
*/
public abstract class AbstractPartitionAssignor implements PartitionAssignor {
private static final Logger log = LoggerFactory.getLogger(AbstractPartitionAssignor.class);
/**
* Perform the group assignment given the partition counts and member subscriptions
* @param partitionsPerTopic The number of partitions for each subscribed topic. Topics not in metadata will be excluded
* from this map.
* @param subscriptions Map from the memberId to their respective topic subscription
* @return Map from each member to the list of partitions assigned to them.
*/
public abstract Map<String, List<TopicPartition>> assign(Map<String, Integer> partitionsPerTopic,
Map<String, List<String>> subscriptions);
@Override
public Subscription subscription(Set<String> topics) {
return new Subscription(new ArrayList<>(topics));
}
@Override
public Map<String, Assignment> assign(Cluster metadata, Map<String, Subscription> subscriptions) {
Set<String> allSubscribedTopics = new HashSet<>();
Map<String, List<String>> topicSubscriptions = new HashMap<>();
for (Map.Entry<String, Subscription> subscriptionEntry : subscriptions.entrySet()) {
List<String> topics = subscriptionEntry.getValue().topics();
allSubscribedTopics.addAll(topics);
topicSubscriptions.put(subscriptionEntry.getKey(), topics);
}
Map<String, Integer> partitionsPerTopic = new HashMap<>();
for (String topic : allSubscribedTopics) {
Integer numPartitions = metadata.partitionCountForTopic(topic);
List<PartitionInfo> partitionInfos = metadata.partitionsForTopic(topic);
if (numPartitions != null && numPartitions > 0 && partitionInfos != null)
partitionsPerTopic.put(topic, numPartitions);
else
log.debug("Skipping assignment for topic {} since no metadata is available", topic);
}
// Note: because partitionsPerTopic only represents the number of partitions,
// the list of TopicPartition returned here does not corresponds to the actual partition
// number on the cluster if the partition starts from non-zero. The partition here can be
// interpreted as an index to the real partition
Map<String, List<TopicPartition>> rawAssignments = assign(partitionsPerTopic, topicSubscriptions);
// this class has maintains no user data, so just wrap the results
Map<String, Assignment> assignments = new HashMap<>();
for (Map.Entry<String, List<TopicPartition>> assignmentEntry : rawAssignments.entrySet()) {
List<TopicPartition> rawPartitions = assignmentEntry.getValue();
List<TopicPartition> actualPartitions = new ArrayList<>();
// Transform the partition number to the actual number on the cluster by using it
// as an index
for (TopicPartition topicPartition: rawPartitions) {
int partition = topicPartition.partition();
String topic = topicPartition.topic();
List<PartitionInfo> partitionInfos = metadata.partitionsForTopic(topic);
if (partitionInfos != null) {
actualPartitions.add(new TopicPartition(topic, partitionInfos.get(partition).partition()));
}
}
assignments.put(assignmentEntry.getKey(), new Assignment(actualPartitions));
}
return assignments;
}
@Override
public void onAssignment(Assignment assignment) {
// this assignor maintains no internal state, so nothing to do
}
protected static <K, V> void put(Map<K, List<V>> map, K key, V value) {
List<V> list = map.get(key);
if (list == null) {
list = new ArrayList<>();
map.put(key, list);
}
list.add(value);
}
protected static List<TopicPartition> partitions(String topic, int numPartitions) {
List<TopicPartition> partitions = new ArrayList<>(numPartitions);
for (int i = 0; i < numPartitions; i++)
partitions.add(new TopicPartition(topic, i));
return partitions;
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment