Skip to content

Instantly share code, notes, and snippets.

@nsoft
Last active November 3, 2017 02:49
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save nsoft/397da0bafbf4f59acd793218472c3710 to your computer and use it in GitHub Desktop.
Save nsoft/397da0bafbf4f59acd793218472c3710 to your computer and use it in GitHub Desktop.
SOLR-11487-watch.patch -- Patch vs master as of ~9pm Nov 2 2017
Index: solr/core/src/java/org/apache/solr/cloud/CreateAliasCmd.java
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
===================================================================
--- solr/core/src/java/org/apache/solr/cloud/CreateAliasCmd.java (date 1509475828000)
+++ solr/core/src/java/org/apache/solr/cloud/CreateAliasCmd.java (date 1508764667000)
@@ -21,9 +21,7 @@
import java.util.HashSet;
import java.util.List;
import java.util.Locale;
-import java.util.Objects;
import java.util.Set;
-import java.util.concurrent.TimeUnit;
import org.apache.solr.cloud.OverseerCollectionMessageHandler.Cmd;
import org.apache.solr.common.SolrException;
@@ -32,8 +30,6 @@
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.common.util.StrUtils;
-import org.apache.solr.util.TimeOut;
-import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -57,20 +53,7 @@
ZkStateReader zkStateReader = ocmh.zkStateReader;
validateAllCollectionsExistAndNoDups(collections, zkStateReader);
- byte[] jsonBytes = zkStateReader.getAliases().cloneWithCollectionAlias(aliasName, collections).toJSON();
- try {
- zkStateReader.getZkClient().setData(ZkStateReader.ALIASES, jsonBytes, true);
-
- checkForAlias(aliasName, collections);
- // some fudge for other nodes
- Thread.sleep(100);
- } catch (KeeperException e) {
- log.error("", e);
- throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
- } catch (InterruptedException e) {
- log.warn("", e);
- throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
- }
+ zkStateReader.exportAliasToZk(aliasName, collections);
}
private void validateAllCollectionsExistAndNoDups(String collections, ZkStateReader zkStateReader) {
@@ -89,18 +72,5 @@
}
}
- private void checkForAlias(String name, String value) {
- TimeOut timeout = new TimeOut(30, TimeUnit.SECONDS);
- boolean success = false;
- while (!timeout.hasTimedOut()) {
- String collections = ocmh.zkStateReader.getAliases().getCollectionAliasMap().get(name);
- if (Objects.equals(collections, value)) {
- success = true;
- break;
- }
- }
- if (!success) {
- log.warn("Timeout waiting to be notified of Alias change...");
- }
- }
+
}
Index: solr/core/src/test/org/apache/solr/cloud/AliasIntegrationTest.java
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
===================================================================
--- solr/core/src/test/org/apache/solr/cloud/AliasIntegrationTest.java (date 1509475828000)
+++ solr/core/src/test/org/apache/solr/cloud/AliasIntegrationTest.java (date 1508764667000)
@@ -17,6 +17,8 @@
package org.apache.solr.cloud;
import java.io.IOException;
+import java.util.List;
+import java.util.Map;
import java.util.function.Consumer;
import org.apache.solr.client.solrj.SolrQuery;
@@ -28,6 +30,9 @@
import org.apache.solr.client.solrj.request.UpdateRequest;
import org.apache.solr.client.solrj.response.QueryResponse;
import org.apache.solr.common.SolrException;
+import org.apache.solr.common.cloud.Aliases;
+import org.apache.solr.common.cloud.SolrZkClient;
+import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.params.SolrParams;
import org.junit.BeforeClass;
@@ -41,6 +46,95 @@
.addConfig("conf", configset("cloud-minimal"))
.configure();
}
+
+ @Test
+ public void testMetadata() throws Exception {
+ CollectionAdminRequest.createCollection("collection1meta", "conf", 2, 1).process(cluster.getSolrClient());
+ CollectionAdminRequest.createCollection("collection2meta", "conf", 1, 1).process(cluster.getSolrClient());
+ waitForState("Expected collection1 to be created with 2 shards and 1 replica", "collection1meta", clusterShape(2, 1));
+ waitForState("Expected collection2 to be created with 1 shard and 1 replica", "collection2meta", clusterShape(1, 1));
+ ZkStateReader zkStateReader = cluster.getSolrClient().getZkStateReader();
+ zkStateReader.createClusterStateWatchersAndUpdate();
+ List<String> aliases = zkStateReader.getAliases().resolveAliases("meta1");
+ assertEquals(1, aliases.size());
+ assertEquals("meta1", aliases.get(0));
+ zkStateReader.exportAllAliases(zkStateReader.getAliases().cloneWithCollectionAlias("meta1", "collection1meta,collection2meta"));
+ aliases = zkStateReader.getAliases().resolveAliases("meta1");
+ assertEquals(2, aliases.size());
+ assertEquals("collection1meta", aliases.get(0));
+ assertEquals("collection2meta", aliases.get(1));
+
+ // set metadata
+ Aliases cloned = zkStateReader.getAliases().cloneWithCollectionAliasMetadata("meta1", "foo", "bar");
+ zkStateReader.exportAllAliases(cloned);
+ Map<String, String> meta = zkStateReader.getAliases().getCollectionAliasMetadata("meta1");
+ assertNotNull(meta);
+ assertTrue(meta.containsKey("foo"));
+ assertEquals("bar", meta.get("foo"));
+
+ // set more metadata
+ cloned = zkStateReader.getAliases().cloneWithCollectionAliasMetadata("meta1", "foobar", "bazbam");
+ zkStateReader.exportAllAliases(cloned);
+ meta = zkStateReader.getAliases().getCollectionAliasMetadata("meta1");
+ assertNotNull(meta);
+
+ // old metadata still there
+ assertTrue(meta.containsKey("foo"));
+ assertEquals("bar", meta.get("foo"));
+
+ // new metadata added
+ assertTrue(meta.containsKey("foobar"));
+ assertEquals("bazbam", meta.get("foobar"));
+
+ // remove metadata
+ cloned = zkStateReader.getAliases().cloneWithCollectionAliasMetadata("meta1", "foo", null);
+ zkStateReader.exportAllAliases(cloned);
+ meta = zkStateReader.getAliases().getCollectionAliasMetadata("meta1");
+ assertNotNull(meta);
+
+ // verify key was removed
+ assertFalse(meta.containsKey("foo"));
+
+ // but only the specified key was removed
+ assertTrue(meta.containsKey("foobar"));
+ assertEquals("bazbam", meta.get("foobar"));
+
+ // removal of non existent key should succeed.
+ cloned = zkStateReader.getAliases().cloneWithCollectionAliasMetadata("meta1", "foo", null);
+ zkStateReader.exportAllAliases(cloned);
+
+ // now check that an independently constructed ZkStateReader can see what we've done.
+ // i.e. the data is really in zookeeper
+ String zkAddress = cluster.getZkServer().getZkAddress();
+ boolean createdZKSR = false;
+ try(SolrZkClient zkClient = new SolrZkClient(zkAddress, 30000)) {
+
+ ZkController.createClusterZkNodes(zkClient);
+
+ zkStateReader = new ZkStateReader(zkClient);
+ createdZKSR = true;
+ zkStateReader.createClusterStateWatchersAndUpdate();
+
+ meta = zkStateReader.getAliases().getCollectionAliasMetadata("meta1");
+ assertNotNull(meta);
+
+ // verify key was removed in independent view
+ assertFalse(meta.containsKey("foo"));
+
+ // but only the specified key was removed
+ assertTrue(meta.containsKey("foobar"));
+ assertEquals("bazbam", meta.get("foobar"));
+
+ Aliases a = zkStateReader.getAliases();
+ Aliases clone = a.cloneWithCollectionAlias("meta1", null);
+ meta = clone.getCollectionAliasMetadata("meta1");
+ assertEquals(0,meta.size());
+ } finally {
+ if (createdZKSR) {
+ zkStateReader.close();
+ }
+ }
+ }
@Test
public void test() throws Exception {
Index: solr/solrj/src/java/org/apache/solr/common/cloud/Aliases.java
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
===================================================================
--- solr/solrj/src/java/org/apache/solr/common/cloud/Aliases.java (date 1509475828000)
+++ solr/solrj/src/java/org/apache/solr/common/cloud/Aliases.java (date 1509666567000)
@@ -33,24 +33,43 @@
*/
public class Aliases {
- public static final Aliases EMPTY = new Aliases(Collections.emptyMap());
+ // need to be able to test this with == in constructor, can't use Collections.emptyMap()
+ @SuppressWarnings("unchecked")
+ public static final Aliases EMPTY = new Aliases(Collections.EMPTY_MAP);
+ private static final String COLLECTION_METADATA = "collection_metadata";
+ private static final String COLLECTION = "collection";
/** Map of "collection" string constant to ->
* alias name -> comma delimited list of collections */
- private final Map<String,Map<String,String>> aliasMap; // not-null
+ private final Map<String,Map> aliasMap; // not-null
- private final Map<String, List<String>> collectionAliasListMap; // not-null; computed from aliasMap
+ // aliasName --> metadataKey --> metadataValue
+ private Map<String, Map<String, String>> collectionAliasMetadata;
+
+ @SuppressWarnings("unchecked")
public static Aliases fromJSON(byte[] bytes) {
if (bytes == null || bytes.length == 0) {
return EMPTY;
}
- return new Aliases((Map<String,Map<String,String>>) Utils.fromJSON(bytes));
+ return new Aliases((Map) Utils.fromJSON(bytes));
}
- private Aliases(Map<String, Map<String,String>> aliasMap) {
+ @SuppressWarnings("unchecked")
+ private Aliases(Map<String, Map> aliasMap) {
this.aliasMap = aliasMap;
- collectionAliasListMap = convertMapOfCommaDelimitedToMapOfList(getCollectionAliasMap());
+ if (aliasMap.size() == 0) {
+ aliasMap = Collections.EMPTY_MAP;
+ }
+ this.collectionAliasMetadata = this.aliasMap.get(COLLECTION_METADATA);
+ if (aliasMap != Collections.EMPTY_MAP) {
+ if (collectionAliasMetadata == null) {
+ // bootstrap it in if it doesn't exist.
+ Map<String, Map<String, String>> newMap = new HashMap<>();
+ this.aliasMap.put(COLLECTION_METADATA, newMap);
+ this.collectionAliasMetadata = newMap;
+ }
+ }
}
public static Map<String, List<String>> convertMapOfCommaDelimitedToMapOfList(Map<String, String> collectionAliasMap) {
@@ -66,21 +85,57 @@
* Does not return null.
* Prefer use of {@link #getCollectionAliasListMap()} instead, where appropriate.
*/
+ @SuppressWarnings("unchecked")
public Map<String,String> getCollectionAliasMap() {
- Map<String,String> cam = aliasMap.get("collection");
- return cam == null ? Collections.emptyMap() : Collections.unmodifiableMap(cam);
+ Map<String, String> cam = aliasMap.get(COLLECTION);
+ if (cam == null) {
+ return Collections.emptyMap();
+ } else {
+ HashMap<String, String> stringMap = new HashMap<>();
+ for (String alias : cam.keySet()) {
+ stringMap.put(alias, cam.get(alias));
+ }
+ return Collections.unmodifiableMap(stringMap);
+ }
}
/**
* Returns an unmodifiable Map of collection aliases mapped to a list of what the alias maps to.
* Does not return null.
*/
+ @SuppressWarnings("unchecked")
public Map<String,List<String>> getCollectionAliasListMap() {
- return Collections.unmodifiableMap(collectionAliasListMap);
+ Map listMap = aliasMap.get(COLLECTION);
+ if (listMap == null) {
+ return Collections.EMPTY_MAP;
+ }
+ // need to also ensure the list inside the map can't be changed, but don't want to convert lists in
+ // source map to immutable, so clone the source map first, then replace the values with immutable lists.
+ listMap = convertMapOfCommaDelimitedToMapOfList(listMap);
+ for (Object alias : listMap.keySet()) {
+ listMap.put(alias,Collections.unmodifiableList((List)listMap.get(alias)));
+ }
+ return Collections.unmodifiableMap(listMap);
+ }
+
+ /**
+ * Returns an unmodifieable Map of metadata for a given alias. If an alias by the given name
+ * exists, this method will never return null.
+ *
+ * @param alias the name of an alias also found as a key in {@link #getCollectionAliasListMap()}
+ * @return The metadata for the alias (possibly empty) or null if the alias does not exist.
+ */
+ public Map<String,String> getCollectionAliasMetadata(String alias) {
+ if (collectionAliasMetadata == null) {
+ return Collections.emptyMap();
+ }
+ Map<String, String> map = collectionAliasMetadata.get(alias);
+ return map == null ? Collections.emptyMap() : Collections.unmodifiableMap(map);
}
-
+
public boolean hasCollectionAliases() {
- return !collectionAliasListMap.isEmpty();
+ Map map = aliasMap.get(COLLECTION);
+ return map != null && !map.isEmpty();
}
/**
@@ -89,7 +144,11 @@
* Treat the result as unmodifiable.
*/
public List<String> resolveAliases(String aliasName) {
- return resolveAliasesGivenAliasMap(collectionAliasListMap, aliasName);
+ Map collectionAliasMap = aliasMap.get(COLLECTION);
+ if (collectionAliasMap == null) {
+ return Collections.singletonList(aliasName);
+ }
+ return resolveAliasesGivenAliasMap(convertMapOfCommaDelimitedToMapOfList(collectionAliasMap), aliasName);
}
/** @lucene.internal */
@@ -97,6 +156,9 @@
//return collectionAliasListMap.getOrDefault(aliasName, Collections.singletonList(aliasName));
// TODO deprecate and remove this dubious feature?
// Due to another level of indirection, this is more complicated...
+ if (collectionAliasListMap == null) {
+ return Collections.singletonList(aliasName);
+ }
List<String> level1 = collectionAliasListMap.get(aliasName);
if (level1 == null) {
return Collections.singletonList(aliasName);// is a collection
@@ -115,25 +177,101 @@
/**
* Creates a new Aliases instance with the same data as the current one but with a modification based on the
- * parameters. If {@code collections} is null, then the {@code alias} is removed, otherwise it is added/updated.
+ * parameters.
+ * <p>
+ * Note that the state in zookeeper is unaffected by this method and the change must still be persisted via
+ * {@link ZkStateReader#exportAllAliases(Aliases)}
+ *
+ * @param alias the alias to update, must not be null
+ * @param collections the comma separated list of collections for the alias, null to remove the alias
*/
+ @SuppressWarnings("unchecked")
public Aliases cloneWithCollectionAlias(String alias, String collections) {
- Map<String,String> newCollectionMap = new HashMap<>(getCollectionAliasMap());
+ if (alias == null) {
+ throw new NullPointerException("Alias name cannot be null");
+ }
+ Aliases newAliases = cloneAliases(); // to avoid mutating the object we were given.
if (collections == null) {
- newCollectionMap.remove(alias);
+ newAliases.aliasMap.get(COLLECTION_METADATA).remove(alias);
+ newAliases.aliasMap.get(COLLECTION).remove(alias);
} else {
- newCollectionMap.put(alias, collections);
+ newAliases.aliasMap.get(COLLECTION).put(alias, collections);
}
- if (newCollectionMap.isEmpty()) {
+ if (!newAliases.hasCollectionAliases()) {
return EMPTY;
} else {
- return new Aliases(Collections.singletonMap("collection", newCollectionMap));
+ return newAliases;
}
}
+
+ /**
+ * Set the value for some metadata on a collection alias. This is done by creating a new immutable Aliases instance
+ * with the same data as the current one but with a modification based on the parameters.
+ * <p>
+ * Note that the state in zookeeper is unaffected by this method and the change must still be persisted via
+ * {@link ZkStateReader#exportAllAliases(Aliases)}
+ *
+ * @param alias the alias to update
+ * @param metadataKey the key for the metadata
+ * @param metadataValue the metadata to add/replace, null to remove the key.
+ * @return An immutable copy of the aliases with the new metadata.
+ */
+ @SuppressWarnings("unchecked")
+ public Aliases cloneWithCollectionAliasMetadata(String alias, String metadataKey, String metadataValue){
+ if (alias == null || getCollectionAliasMap().get(alias) == null) {
+ throw new IllegalArgumentException(alias + " is not a valid alias");
+ }
+ if (metadataKey == null) {
+ throw new IllegalArgumentException("Null is not a valid metadata key");
+ }
+ Aliases aliases = cloneAliases();
+ Map<String, String> metaMap = (Map<String, String>) aliases.aliasMap.get(COLLECTION_METADATA)
+ .computeIfAbsent(alias, k -> new HashMap<>());
+ if (metadataValue != null) {
+ metaMap.put(metadataKey,metadataValue);
+ } else {
+ metaMap.remove(metadataKey);
+ }
+
+ return aliases;
+ }
+
+ /**
+ * Make a deep copy of ourselves.
+ *
+ * @return a deep copy with no references to the previous copy's internal structures.
+ */
+ @SuppressWarnings("unchecked")
+ private Aliases cloneAliases() {
+ //TODO: think about modeling this with objects not collections if this gets any more complex...
+ Map<String,String> newCollectionMap;
+ Map existingCollectionAliases = aliasMap.get(COLLECTION);
+ if (existingCollectionAliases == Collections.EMPTY_MAP || existingCollectionAliases == null ) {
+ // The only reason to call clone is to add something... we need to get rid of the EMPTY_MAP at this point.
+ newCollectionMap = new HashMap<>();
+ } else {
+ newCollectionMap = new HashMap<>(existingCollectionAliases);
+ }
+ Map existingMetadata = aliasMap.get(COLLECTION_METADATA);
+ Map<String, Map<String, String>> newCollectionMetadataMap;
+ if (existingMetadata == null) {
+ newCollectionMetadataMap = new HashMap<>();
+ } else {
+ newCollectionMetadataMap = new HashMap<>(existingMetadata);
+ }
+ // need to recreate all the sub-maps too to achieve a deep clone
+ for (String alias : newCollectionMetadataMap.keySet()) {
+ newCollectionMetadataMap.put(alias, new HashMap(newCollectionMetadataMap.get(alias)));
+ }
+ HashMap<String, Map> topMap = new HashMap<>();
+ topMap.put(COLLECTION, newCollectionMap);
+ topMap.put(COLLECTION_METADATA, newCollectionMetadataMap);
+ return new Aliases(topMap);
+ }
/** Serialize to ZooKeeper. */
public byte[] toJSON() {
- if (collectionAliasListMap.isEmpty()) {
+ if (!hasCollectionAliases()) {
return null;
} else {
return Utils.toJSON(aliasMap);
Index: solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
===================================================================
--- solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java (date 1509475828000)
+++ solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java (date 1509674445000)
@@ -162,6 +162,8 @@
private final ExecutorService notifications = ExecutorUtil.newMDCAwareCachedThreadPool("watches");
+ private final AliasWatcher aliasWatcher = new AliasWatcher();
+
/**
* Get current {@link AutoScalingConfig}.
* @return current configuration from <code>autoscaling.json</code>. NOTE:
@@ -193,6 +195,70 @@
return new AutoScalingConfig(map);
}
+ /**
+ * Writes an alias to zk, and waits for up to 30 seconds to see that that the new alias value
+ * is available before returning.
+ *
+ * @param aliasName The alias to store
+ * @param collections The collections that are part of the alias
+ */
+ public void exportAliasToZk(String aliasName, String collections) {
+ Aliases aliases = getAliases().cloneWithCollectionAlias(aliasName, collections);
+ try {
+ // TODO: worries me that aliases can't be independently updated...
+ exportAllAliases(aliases);
+ checkForAlias(aliasName, collections);
+ // some fudge for other nodes
+ Thread.sleep(100);
+ this.aliases = aliases;
+ } catch (KeeperException e) {
+ LOG.error("", e);
+ throw new SolrException(ErrorCode.SERVER_ERROR, e);
+ } catch (InterruptedException e) {
+ LOG.warn("", e);
+ throw new SolrException(ErrorCode.SERVER_ERROR, e);
+ }
+ }
+
+ /**
+ * Save the alias map and make it the current alias map.
+ *
+ * @param aliases the alias map to save and remember
+ * @throws KeeperException if zookeeper is especially grumpy
+ * @throws InterruptedException if we get interrupted while talking to zookeeper.
+ */
+ public void exportAllAliases(Aliases aliases) throws KeeperException, InterruptedException {
+ synchronized (getUpdateLock()) { // ensure all threads can see the latest when doing this
+ byte[] jsonBytes = aliases.toJSON();
+ getZkClient().setData(ALIASES, jsonBytes, true);
+ this.aliases = aliases;
+ }
+ }
+
+ private void checkForAlias(String name, String value) throws InterruptedException {
+ final boolean[] success = {false};
+ AliasCondition updateVisible = new AliasCondition() {
+ @Override
+ public boolean test(Aliases aliases) {
+ String collections = aliases.getCollectionAliasMap().get(name);
+ if (Objects.equals(collections, value)) {
+ success[0] = true;
+ this.latch.countDown();
+ return true;
+ }
+ return false;
+ }
+ };
+ this.aliasWatcher.addCondition(updateVisible);
+ updateVisible.await(30, TimeUnit.SECONDS);
+ if (!success[0]) {
+ LOG.warn("Timeout waiting to be notified of Alias change...");
+ }
+ }
+
+
+
+
private static class CollectionWatch {
int coreRefCount = 0;
@@ -386,7 +452,12 @@
refreshLiveNodes(null);
}
- /** Never null. */
+ /**
+ * Get an immutable copy of the present state of the aliases. References to this object should not be retained
+ * in any context where it will be important to know if aliases have changed.
+ *
+ * @return The current aliases, Aliases.EMPTY if not solr cloud, or no aliases have existed yet. Never returns null.
+ */
public Aliases getAliases() {
assert aliases != null;
return aliases;
@@ -436,45 +507,7 @@
refreshLegacyClusterState(new LegacyClusterStateWatcher());
refreshStateFormat2Collections();
refreshCollectionList(new CollectionsChildWatcher());
-
- synchronized (ZkStateReader.this.getUpdateLock()) {
- constructState(Collections.emptySet());
-
- zkClient.exists(ALIASES,
- new Watcher() {
-
- @Override
- public void process(WatchedEvent event) {
- // session events are not change events, and do not remove the watcher
- if (EventType.None.equals(event.getType())) {
- return;
- }
- try {
- synchronized (ZkStateReader.this.getUpdateLock()) {
- LOG.debug("Updating aliases... ");
-
- // remake watch
- final Watcher thisWatch = this;
- final Stat stat = new Stat();
- final byte[] data = zkClient.getData(ALIASES, thisWatch, stat, true);
- ZkStateReader.this.aliases = Aliases.fromJSON(data);
- LOG.debug("New alias definition is: " + ZkStateReader.this.aliases.toString());
- }
- } catch (KeeperException.ConnectionLossException | KeeperException.SessionExpiredException e) {
- LOG.warn("ZooKeeper watch triggered, but Solr cannot talk to ZK: [{}]", e.getMessage());
- } catch (KeeperException e) {
- LOG.error("A ZK error has occurred", e);
- throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR, "A ZK error has occurred", e);
- } catch (InterruptedException e) {
- // Restore the interrupted status
- Thread.currentThread().interrupt();
- LOG.warn("Interrupted", e);
- }
- }
-
- }, true);
- }
- updateAliases();
+ refreshAliases(this.aliasWatcher);
if (securityNodeListener != null) {
addSecuritynodeWatcher(pair -> {
@@ -488,6 +521,14 @@
}
}
+ private void refreshAliases(AliasWatcher watcher) throws KeeperException, InterruptedException {
+ synchronized (ZkStateReader.this.getUpdateLock()) {
+ constructState(Collections.emptySet());
+ zkClient.exists(ALIASES, watcher, true);
+ }
+ updateAliases();
+ }
+
private void addSecuritynodeWatcher(final Callable<Pair<byte[], Stat>> callback)
throws KeeperException, InterruptedException {
zkClient.exists(SOLR_SECURITY_CONF_PATH,
@@ -1473,4 +1514,74 @@
}
+ /**
+ * A class to watch for changes to aliases. There should only ever be one instance of this class
+ * per instance of ZkStateReader. Normally it will not be useful to create a new instance since
+ * this watcher automatically re-registers itself every time it is updated.
+ */
+ private class AliasWatcher implements Watcher {
+
+ private final List<AliasCondition> conditions = new ArrayList<>();
+
+ @Override
+ public void process(WatchedEvent event) {
+ // session events are not change events, and do not remove the watcher
+ if (EventType.None.equals(event.getType())) {
+ return;
+ }
+ try {
+ synchronized (ZkStateReader.this.getUpdateLock()) {
+ LOG.debug("Updating aliases... ");
+
+ // re-register the watch
+ final byte[] data = zkClient.getData(ALIASES, this, null, true);
+ Aliases aliases = Aliases.fromJSON(data);
+ ZkStateReader.this.aliases = aliases;
+ LOG.debug("New alias definition is: " + ZkStateReader.this.aliases.toString());
+ synchronized (conditions) {
+ for (AliasCondition condition : conditions) {
+ if (condition.test(aliases)) {
+ conditions.remove(condition);
+ }
+ }
+ }
+ }
+ } catch (KeeperException.ConnectionLossException | KeeperException.SessionExpiredException e) {
+ LOG.warn("ZooKeeper watch triggered, but Solr cannot talk to ZK: [{}]", e.getMessage());
+ } catch (KeeperException e) {
+ LOG.error("A ZK error has occurred", e);
+ throw new ZooKeeperException(ErrorCode.SERVER_ERROR, "A ZK error has occurred", e);
+ } catch (InterruptedException e) {
+ // Restore the interrupted status
+ Thread.currentThread().interrupt();
+ LOG.warn("Interrupted", e);
+ }
+ }
+
+ void addCondition(AliasCondition condition) {
+ synchronized (conditions) {
+ if (!condition.test(ZkStateReader.this.aliases)) {
+ conditions.add(condition);
+ }
+ }
+ }
+ }
+
+ private abstract class AliasCondition {
+ CountDownLatch latch = new CountDownLatch(1);
+
+ public void await(long timeout, TimeUnit units) throws InterruptedException {
+ latch.await(timeout, units);
+ }
+
+ /**
+ * Perform some test on the aliases. Typical implementations should count down the latch and
+ * return true if the test succeeds.
+ *
+ * @param aliases the aliases object to check.
+ * @return true if the desired condition is met and the latch has been counted down.
+ */
+ abstract boolean test(Aliases aliases);
+ }
+
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment