Skip to content

Instantly share code, notes, and snippets.

@vthacker
Created March 11, 2015 09:19
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save vthacker/971d7f2d3ff472746b53 to your computer and use it in GitHub Desktop.
Save vthacker/971d7f2d3ff472746b53 to your computer and use it in GitHub Desktop.
getBestCreateUrl AutoAddReplicas
static String getBestCreateUrl(ZkStateReader zkStateReader, DownReplica badReplica, Integer maxCoreCount) {
assert badReplica != null;
assert badReplica.collection != null;
assert badReplica.slice != null;
Map<String,Counts> counts = new HashMap<>();
ValueComparator vc = new ValueComparator(counts);
Set<String> liveNodes = new HashSet<>(zkStateReader.getClusterState().getLiveNodes());
Map<String, Integer> coresPerNode = new HashMap<>();
ClusterState clusterState = zkStateReader.getClusterState();
if (clusterState != null) {
Set<String> collections = clusterState.getCollections();
for (String collection : collections) {
log.debug("look at collection {} as possible create candidate", collection);
DocCollection docCollection = clusterState.getCollection(collection);
// TODO - only operate on collections with sharedfs failover = true ??
Collection<Slice> slices = docCollection.getSlices();
for (Slice slice : slices) {
// only look at active shards
if (slice.getState().equals(Slice.ACTIVE)) {
log.debug("look at slice {} as possible create candidate", slice.getName());
Collection<Replica> replicas = slice.getReplicas();
for (Replica replica : replicas) {
liveNodes.remove(replica.getNodeName());
if (replica.getStr(ZkStateReader.BASE_URL_PROP).equals(
badReplica.replica.getStr(ZkStateReader.BASE_URL_PROP))) {
continue;
}
String baseUrl = replica.getStr(ZkStateReader.BASE_URL_PROP);
if (coresPerNode.containsKey(baseUrl)) {
Integer nodeCount = coresPerNode.get(baseUrl);
coresPerNode.put(baseUrl, nodeCount++);
} else {
coresPerNode.put(baseUrl, 1);
}
// on a live node?
log.debug("nodename={} livenodes={}", replica.getNodeName(), clusterState.getLiveNodes());
boolean live = clusterState.liveNodesContain(replica.getNodeName());
log.debug("look at replica {} as possible create candidate, live={}", replica.getName(), live);
if (live) {
Counts cnt = counts.get(baseUrl);
if (cnt == null) {
cnt = new Counts();
}
if (badReplica.collection.getName().equals(collection)) {
cnt.negRankingWeight += 3;
cnt.collectionShardsOnNode += 1;
} else {
cnt.negRankingWeight += 1;
}
if (badReplica.collection.getName().equals(collection) && badReplica.slice.getName().equals(slice.getName())) {
cnt.ourReplicas++;
}
int maxShardsPerNode = docCollection.getMaxShardsPerNode();
log.debug("maxShardsPerNode={} maxCoresPerNode={} good replicas={}", maxShardsPerNode, maxCoreCount, cnt);
Collection<Replica> badSliceReplicas = null;
DocCollection c = clusterState.getCollection(badReplica.collection.getName());
if (c != null) {
Slice s = c.getSlice(badReplica.slice.getName());
if (s != null) {
badSliceReplicas = s.getReplicas();
}
}
boolean alreadyExistsOnNode = replicaAlreadyExistsOnNode(zkStateReader.getClusterState(), badSliceReplicas, badReplica, baseUrl);
if (alreadyExistsOnNode || (maxCoreCount != null && coresPerNode.get(baseUrl) >= maxCoreCount) ||
cnt.collectionShardsOnNode >= maxShardsPerNode) {
counts.remove(replica.getStr(ZkStateReader.BASE_URL_PROP));
} else {
counts.put(replica.getStr(ZkStateReader.BASE_URL_PROP), cnt);
}
}
}
}
}
}
}
for (String node : liveNodes) {
counts.put(zkStateReader.getBaseUrlForNodeName(node), new Counts(0, 0));
}
if (counts.size() == 0) {
return null;
}
Map<String,Counts> sortedCounts = new TreeMap<>(vc);
sortedCounts.putAll(counts);
log.debug("empty nodes={}", liveNodes);
log.debug("sorted hosts={}", sortedCounts);
return sortedCounts.keySet().iterator().next();
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment