Skip to content

Instantly share code, notes, and snippets.

@markrmiller
Created August 21, 2012 21:09
Show Gist options
  • Save markrmiller/3419410 to your computer and use it in GitHub Desktop.
Save markrmiller/3419410 to your computer and use it in GitHub Desktop.
Index: solr/testlogging.properties
===================================================================
--- solr/testlogging.properties (revision 1375795)
+++ solr/testlogging.properties (working copy)
@@ -4,15 +4,22 @@
java.util.logging.ConsoleHandler.formatter=org.apache.solr.SolrLogFormatter
-#.level=SEVERE
-.level=INFO
+.level=SEVERE
+#.level=INFO
-#org.apache.solr.update.processor.LogUpdateProcessor=FINEST
-#org.apache.solr.update.processor.DistributedUpdateProcessor=FINEST
+#org.apache.solr.update.processor.LogUpdateProcessor.level=FINEST
+#org.apache.solr.update.processor.DistributedUpdateProcessor.level=FINEST
+org.apache.solr.update.DefaultSolrCoreState.level=FINEST
+org.apache.solr.cloud.ZkController.level=FINEST
+org.apache.solr.cloud.ShardLeaderElectionContext.level=FINEST
#org.apache.solr.update.PeerSync.level=FINEST
-#org.apache.solr.cloud.RecoveryStrategy.level=FINEST
-#org.apache.solr.cloud.SyncStrategy.level=FINEST
-#org.apache.solr.update.DefaultSolrCoreState.level=FINEST
+org.apache.solr.cloud.RecoveryStrategy.level=FINEST
+org.apache.solr.cloud.SyncStrategy.level=FINEST
+org.apache.solr.handler.admin.CoreAdminHandler.level=FINEST
+org.apache.solr.common.cloud.ConnectionManager.level=FINEST
+org.apache.solr.update.SolrCmdDistributor.level=FINEST
#org.apache.solr.update.UpdateLog.level=FINE
#org.apache.solr.update.TransactionLog.level=FINEST
+org.apache.solr.cloud.ChaosMonkey.level=FINEST
+
Index: solr/core/src/test/org/apache/solr/cloud/ChaosMonkeyNothingIsSafeTest.java
===================================================================
--- solr/core/src/test/org/apache/solr/cloud/ChaosMonkeyNothingIsSafeTest.java (revision 1375795)
+++ solr/core/src/test/org/apache/solr/cloud/ChaosMonkeyNothingIsSafeTest.java (working copy)
@@ -38,11 +38,11 @@
import org.slf4j.LoggerFactory;
@Slow
-@Ignore("ignore while investigating jenkins fails")
+//@Ignore("ignore while investigating jenkins fails")
public class ChaosMonkeyNothingIsSafeTest extends AbstractFullDistribZkTestBase {
public static Logger log = LoggerFactory.getLogger(ChaosMonkeyNothingIsSafeTest.class);
- private static final int BASE_RUN_LENGTH = 45000;
+ private static final int BASE_RUN_LENGTH = 8000;
@BeforeClass
public static void beforeSuperClass() {
@@ -56,8 +56,8 @@
@Override
public void setUp() throws Exception {
super.setUp();
- // TODO use @Noisy annotation as we expect lots of exceptions
- //ignoreException(".*");
+ // can help to hide this when testing and looking at logs
+ //ignoreException("shard update error");
System.setProperty("numShards", Integer.toString(sliceCount));
}
@@ -71,8 +71,8 @@
public ChaosMonkeyNothingIsSafeTest() {
super();
- sliceCount = 3;
- shardCount = 12;
+ sliceCount = 1;
+ shardCount = 5;
}
@Override
@@ -83,9 +83,16 @@
handle.put("QTime", SKIPVAL);
handle.put("timestamp", SKIPVAL);
+ // make sure we have leaders for each shard
+ for (int j = 1; j < sliceCount; j++) {
+ zkStateReader.getLeaderProps(DEFAULT_COLLECTION, "shard" + j, 10000);
+ } // make sure we again have leaders for each shard
+
+ waitForRecoveriesToFinish(false);
+
// we cannot do delete by query
// as it's not supported for recovery
- // del("*:*");
+ del("*:*");
List<StopableThread> threads = new ArrayList<StopableThread>();
int threadCount = 1;
@@ -152,6 +159,7 @@
zkStateReader.updateClusterState(true);
assertTrue(zkStateReader.getClusterState().getLiveNodes().size() > 0);
+
// we dont't current check vs control because the full throttle thread can
// have request fails
checkShardConsistency(false, true);
Index: solr/core/src/test/org/apache/solr/cloud/ChaosMonkeySafeLeaderTest.java
===================================================================
--- solr/core/src/test/org/apache/solr/cloud/ChaosMonkeySafeLeaderTest.java (revision 1375795)
+++ solr/core/src/test/org/apache/solr/cloud/ChaosMonkeySafeLeaderTest.java (working copy)
@@ -50,11 +50,6 @@
@Override
public void setUp() throws Exception {
super.setUp();
- // we expect this time of exception as shards go up and down...
- //ignoreException(".*");
-
- // sometimes we cannot get the same port
- ignoreException("java\\.net\\.BindException: Address already in use");
System.setProperty("numShards", Integer.toString(sliceCount));
}
Index: solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java
===================================================================
--- solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java (revision 1375795)
+++ solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java (working copy)
@@ -183,15 +183,9 @@
// set num nodes
numNodes = zkController.getClusterState().getLiveNodes().size();
- // the leader is...
- // TODO: if there is no leader, wait and look again
- // TODO: we are reading the leader from zk every time - we should cache
- // this and watch for changes?? Just pull it from ZkController cluster state probably?
String shardId = getShard(hash, collection, zkController.getClusterState()); // get the right shard based on the hash...
try {
- // TODO: if we find out we cannot talk to zk anymore, we should probably realize we are not
- // a leader anymore - we shouldn't accept updates at all??
ZkCoreNodeProps leaderProps = new ZkCoreNodeProps(zkController.getZkStateReader().getLeaderProps(
collection, shardId));
@@ -201,7 +195,21 @@
isLeader = coreNodeName.equals(leaderNodeName);
DistribPhase phase =
- DistribPhase.parseParam(req.getParams().get(DISTRIB_UPDATE_PARAM));
+ DistribPhase.parseParam(req.getParams().get(DISTRIB_UPDATE_PARAM));
+
+ // defensive checks
+ boolean localIsLeader = req.getCore().getCoreDescriptor().getCloudDescriptor().isLeader();
+ if (DistribPhase.FROMLEADER == phase && localIsLeader) {
+ log.error("Request says it is coming from leader, but we are the leader: " + req.getParamString());
+ throw new SolrException(ErrorCode.BAD_REQUEST, "Request says it is coming from leader, but we are the leader");
+ }
+
+ if (isLeader && !localIsLeader) {
+ log.error("Our cloud state says we are the leader, but locally we don't think so :" + req.getParamString());
+ new SolrException(ErrorCode.BAD_REQUEST, "Our cloud state says we are the leader, but locally we don't think so");
+ }
+ // done defensive checks
+
if (DistribPhase.FROMLEADER == phase) {
// we are coming from the leader, just go local - add no urls
@@ -329,6 +337,8 @@
DistribPhase.FROMLEADER.toString() :
DistribPhase.TOLEADER.toString()));
params.remove("commit"); // this will be distributed from the local commit
+ params.set("distrib.update", "commit from " + ZkCoreNodeProps.getCoreUrl(
+ zkController.getBaseUrl(), req.getCore().getName()));
cmdDistrib.distribAdd(cmd, nodes, params);
}
@@ -378,9 +388,11 @@
// TODO: we should do this in the background it would seem
for (SolrCmdDistributor.Error error : response.errors) {
- if (error.node instanceof RetryNode) {
+ if (error.node instanceof RetryNode || error.e instanceof SolrException) {
// we don't try to force a leader to recover
// when we cannot forward to it
+ // and we assume SolrException means
+ // the node went down
continue;
}
// TODO: we should force their state to recovering ??
@@ -819,6 +831,8 @@
ModifiableSolrParams params = new ModifiableSolrParams(req.getParams());
params.set(VERSION_FIELD, Long.toString(cmd.getVersion()));
params.set(DISTRIB_UPDATE_PARAM, DistribPhase.FROMLEADER.toString());
+ params.set("update.from", ZkCoreNodeProps.getCoreUrl(
+ zkController.getBaseUrl(), req.getCore().getName()));
cmdDistrib.distribDelete(cmd, replicas, params);
cmdDistrib.finish();
}
@@ -949,15 +963,17 @@
if (zkEnabled) {
zkCheck();
}
-
+ System.out.println("COMMIT");
if (vinfo != null) {
vinfo.lockForUpdate();
}
try {
if (ulog == null || ulog.getState() == UpdateLog.State.ACTIVE || (cmd.getFlags() & UpdateCommand.REPLAY) != 0) {
+ System.out.println("LOCALCOMMIT");
super.processCommit(cmd);
} else {
+ System.out.println("IGNORING");
log.info("Ignoring commit while not ACTIVE - state: " + ulog.getState() + " replay:" + (cmd.getFlags() & UpdateCommand.REPLAY));
}
Index: solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java
===================================================================
--- solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java (revision 1375795)
+++ solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java (working copy)
@@ -166,6 +166,8 @@
addCommit(ureq, cmd);
+ log.info("Distrib commit to:" + nodes);
+
for (Node node : nodes) {
submit(ureq, node);
}
@@ -345,7 +347,8 @@
try {
semaphore.acquire();
} catch (InterruptedException e) {
- throw new RuntimeException();
+ Thread.currentThread().interrupt();
+ throw new RuntimeException("Update thread interrupted");
}
pending.add(completionService.submit(task));
Index: solr/core/src/java/org/apache/solr/client/solrj/embedded/JettySolrRunner.java
===================================================================
--- solr/core/src/java/org/apache/solr/client/solrj/embedded/JettySolrRunner.java (revision 1375795)
+++ solr/core/src/java/org/apache/solr/client/solrj/embedded/JettySolrRunner.java (working copy)
@@ -25,6 +25,7 @@
import java.util.Random;
import javax.servlet.DispatcherType;
+import javax.servlet.Filter;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
@@ -40,6 +41,7 @@
import org.eclipse.jetty.util.component.LifeCycle;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.thread.QueuedThreadPool;
+import org.eclipse.jetty.util.thread.ThreadPool;
/**
* Run solr using jetty
@@ -207,8 +209,8 @@
}
if (!server.isRunning()) {
- server.start();
RUNNING_JETTIES.put(this, new RuntimeException());
+ server.start();
}
synchronized (JettySolrRunner.this) {
int cnt = 0;
@@ -225,10 +227,32 @@
}
public void stop() throws Exception {
- if (!server.isStopped() && !server.isStopping()) {
- server.stop();
- RUNNING_JETTIES.remove(this);
+ // we try and do a bunch of extra stop stuff because
+ // jetty doesn't like to stop if it started
+ // and ended up in a failure state (like when it cannot get the port)
+ if (server.getState().equals(Server.FAILED)) {
+ Connector[] connectors = server.getConnectors();
+ for (Connector connector : connectors) {
+ connector.stop();
+ }
}
+ Filter filter = dispatchFilter.getFilter();
+ ThreadPool threadPool = server.getThreadPool();
+ server.getServer().stop();
+ server.stop();
+ if (threadPool instanceof QueuedThreadPool) {
+ ((QueuedThreadPool) threadPool).setMaxStopTimeMs(15000);
+ ((QueuedThreadPool) threadPool).stop();
+ ((QueuedThreadPool) threadPool).stop();
+ ((QueuedThreadPool) threadPool).stop();
+ }
+ //server.destroy();
+ if (server.getState().equals(Server.FAILED)) {
+ filter.destroy();
+ }
+
+ RUNNING_JETTIES.remove(this);
+
server.join();
}
Index: solr/core/src/java/org/apache/solr/handler/component/RealTimeGetComponent.java
===================================================================
--- solr/core/src/java/org/apache/solr/handler/component/RealTimeGetComponent.java (revision 1375795)
+++ solr/core/src/java/org/apache/solr/handler/component/RealTimeGetComponent.java (working copy)
@@ -20,6 +20,7 @@
import org.apache.lucene.document.Document;
import org.apache.lucene.index.IndexableField;
import org.apache.lucene.index.Term;
+import org.apache.lucene.search.MatchAllDocsQuery;
import org.apache.lucene.util.BytesRef;
import org.apache.solr.client.solrj.SolrResponse;
import org.apache.solr.client.solrj.util.ClientUtils;
@@ -47,6 +48,7 @@
import org.apache.solr.schema.SchemaField;
import org.apache.solr.search.ReturnFields;
import org.apache.solr.search.SolrIndexSearcher;
+import org.apache.solr.update.CommitUpdateCommand;
import org.apache.solr.update.DocumentBuilder;
import org.apache.solr.update.PeerSync;
import org.apache.solr.update.UpdateLog;
@@ -507,6 +509,24 @@
String sync = params.get("sync");
if (sync != null) {
processSync(rb, nVersions, sync);
+// try {
+// RefCounted<SolrIndexSearcher> searchHolder =
+// rb.req.getCore().getNewestSearcher(false);
+// SolrIndexSearcher searcher = searchHolder.get();
+// try {
+// CommitUpdateCommand cuc = new CommitUpdateCommand(req, false);
+// //cuc.softCommit = true;
+// cuc.waitSearcher = true;
+// rb.req.getCore().getUpdateHandler().commit(cuc);
+// System.out.println(rb.req.getCore().getCoreDescriptor().getCoreContainer().getZkController().getNodeName()
+// + " synched "
+// + searcher.search(new MatchAllDocsQuery(), 1).totalHits);
+// } finally {
+// searchHolder.decref();
+// }
+// } catch (Exception e) {
+//
+// }
return;
}
Index: solr/core/src/java/org/apache/solr/handler/admin/CoreAdminHandler.java
===================================================================
--- solr/core/src/java/org/apache/solr/handler/admin/CoreAdminHandler.java (revision 1375795)
+++ solr/core/src/java/org/apache/solr/handler/admin/CoreAdminHandler.java (working copy)
@@ -28,6 +28,7 @@
import org.apache.commons.io.FileUtils;
import org.apache.lucene.index.DirectoryReader;
+import org.apache.lucene.search.MatchAllDocsQuery;
import org.apache.lucene.store.Directory;
import org.apache.lucene.util.IOUtils;
import org.apache.solr.cloud.CloudDescriptor;
@@ -698,7 +699,7 @@
protected void handleRequestSyncAction(SolrQueryRequest req,
SolrQueryResponse rsp) throws IOException {
final SolrParams params = req.getParams();
-
+ System.out.println("WENEEDOTSYNC");
log.info("I have been requested to sync up my shard");
ZkController zkController = coreContainer.getZkController();
if (zkController == null) {
@@ -721,6 +722,21 @@
props.put(ZkStateReader.NODE_NAME_PROP, zkController.getNodeName());
boolean success = syncStrategy.sync(zkController, core, new ZkNodeProps(props));
+ // solrcloud_debug
+// try {
+// RefCounted<SolrIndexSearcher> searchHolder =
+// core.getNewestSearcher(false);
+// SolrIndexSearcher searcher = searchHolder.get();
+// try {
+// System.out.println(core.getCoreDescriptor().getCoreContainer().getZkController().getNodeName()
+// + " synched "
+// + searcher.search(new MatchAllDocsQuery(), 1).totalHits);
+// } finally {
+// searchHolder.decref();
+// }
+// } catch (Exception e) {
+//
+// }
if (!success) {
throw new SolrException(ErrorCode.SERVER_ERROR, "Sync Failed");
}
@@ -750,8 +766,11 @@
String coreNodeName = params.get("coreNodeName");
String waitForState = params.get("state");
Boolean checkLive = params.getBool("checkLive");
+ Boolean onlyIfLeader = params.getBool("onlyIfLeader");
int pauseFor = params.getInt("pauseFor", 0);
+
+
String state = null;
boolean live = false;
int retry = 0;
@@ -764,6 +783,12 @@
+ cname);
}
if (core != null) {
+ if (onlyIfLeader != null && onlyIfLeader) {
+ if (!core.getCoreDescriptor().getCloudDescriptor().isLeader()) {
+ throw new SolrException(ErrorCode.BAD_REQUEST, "We are not the leader");
+ }
+ }
+
// wait until we are sure the recovering node is ready
// to accept updates
CloudDescriptor cloudDescriptor = core.getCoreDescriptor()
Index: solr/core/src/java/org/apache/solr/core/CoreContainer.java
===================================================================
--- solr/core/src/java/org/apache/solr/core/CoreContainer.java (revision 1375795)
+++ solr/core/src/java/org/apache/solr/core/CoreContainer.java (working copy)
@@ -598,15 +598,16 @@
}
cores.clear();
} finally {
+ if (shardHandlerFactory != null) {
+ shardHandlerFactory.close();
+ }
+ // we want to close zk stuff last
if(zkController != null) {
zkController.close();
}
if (zkServer != null) {
zkServer.stop();
}
- if (shardHandlerFactory != null) {
- shardHandlerFactory.close();
- }
}
}
}
Index: solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java
===================================================================
--- solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java (revision 1375795)
+++ solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java (working copy)
@@ -185,6 +185,7 @@
prepCmd.setCoreNodeName(coreZkNodeName);
prepCmd.setState(ZkStateReader.RECOVERING);
prepCmd.setCheckLive(true);
+ prepCmd.setOnlyIfLeader(true);
prepCmd.setPauseFor(6000);
server.request(prepCmd);
@@ -305,7 +306,10 @@
String ourUrl = ZkCoreNodeProps.getCoreUrl(baseUrl, coreName);
boolean isLeader = leaderUrl.equals(ourUrl);
- if (isLeader) {
+ if (isLeader && !cloudDesc.isLeader) {
+ throw new SolrException(ErrorCode.SERVER_ERROR, "Cloud state still says we are leader.");
+ }
+ if (cloudDesc.isLeader) {
// we are now the leader - no one else must have been suitable
log.warn("We have not yet recovered - but we are now the leader! core=" + coreName);
log.info("Finished recovery process. core=" + coreName);
@@ -333,9 +337,6 @@
new ModifiableSolrParams());
core.getUpdateHandler().commit(new CommitUpdateCommand(req, false));
log.info("PeerSync Recovery was successful - registering as Active. core=" + coreName);
- // System.out
- // .println("Sync Recovery was successful - registering as Active "
- // + zkController.getNodeName());
// solrcloud_debug
// try {
Index: solr/core/src/java/org/apache/solr/cloud/ElectionContext.java
===================================================================
--- solr/core/src/java/org/apache/solr/cloud/ElectionContext.java (revision 1375795)
+++ solr/core/src/java/org/apache/solr/cloud/ElectionContext.java (working copy)
@@ -3,6 +3,7 @@
import java.io.IOException;
import java.util.Map;
+import org.apache.lucene.search.MatchAllDocsQuery;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrException.ErrorCode;
import org.apache.solr.common.cloud.ClusterState;
@@ -13,6 +14,8 @@
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.core.CoreContainer;
import org.apache.solr.core.SolrCore;
+import org.apache.solr.search.SolrIndexSearcher;
+import org.apache.solr.util.RefCounted;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.KeeperException.NodeExistsException;
@@ -157,13 +160,28 @@
// first cancel any current recovery
core.getUpdateHandler().getSolrCoreState().cancelRecovery();
boolean success = syncStrategy.sync(zkController, core, leaderProps);
+ // solrcloud_debug
+// try {
+// RefCounted<SolrIndexSearcher> searchHolder =
+// core.getNewestSearcher(false);
+// SolrIndexSearcher searcher = searchHolder.get();
+// try {
+// System.out.println(core.getCoreDescriptor().getCoreContainer().getZkController().getNodeName()
+// + " synched "
+// + searcher.search(new MatchAllDocsQuery(), 1).totalHits);
+// } finally {
+// searchHolder.decref();
+// }
+// } catch (Exception e) {
+//
+// }
if (!success && anyoneElseActive()) {
rejoinLeaderElection(leaderSeqPath, core);
return;
}
}
log.info("I am the new leader: " + ZkCoreNodeProps.getCoreUrl(leaderProps));
-
+ core.getCoreDescriptor().getCloudDescriptor().isLeader = true;
// If I am going to be the leader I have to be active
core.getUpdateHandler().getSolrCoreState().cancelRecovery();
zkController.publish(core.getCoreDescriptor(), ZkStateReader.ACTIVE);
Index: solr/core/src/java/org/apache/solr/cloud/ZkController.java
===================================================================
--- solr/core/src/java/org/apache/solr/cloud/ZkController.java (revision 1375795)
+++ solr/core/src/java/org/apache/solr/cloud/ZkController.java (working copy)
@@ -41,6 +41,7 @@
import org.apache.solr.common.cloud.ClusterState;
import org.apache.solr.common.cloud.OnReconnect;
import org.apache.solr.common.cloud.SolrZkClient;
+import org.apache.solr.common.cloud.ZkClientConnectionStrategy;
import org.apache.solr.common.cloud.ZkCmdExecutor;
import org.apache.solr.common.cloud.ZkCoreNodeProps;
import org.apache.solr.common.cloud.ZkNodeProps;
@@ -209,6 +210,45 @@
});
+
+ zkClient.getZkClientConnectionStrategy().addDisconnectedListener(new ZkClientConnectionStrategy.DisconnectedListener() {
+
+ @Override
+ public void disconnected() {
+ List<CoreDescriptor> descriptors = registerOnReconnect.getCurrentDescriptors();
+ // re register all descriptors
+ if (descriptors != null) {
+ for (CoreDescriptor descriptor : descriptors) {
+ descriptor.getCloudDescriptor().isLeader = false;
+ }
+ }
+ }
+ });
+
+ zkClient.getZkClientConnectionStrategy().addConnectedListener(new ZkClientConnectionStrategy.ConnectedListener() {
+
+ @Override
+ public void connected() {
+ List<CoreDescriptor> descriptors = registerOnReconnect.getCurrentDescriptors();
+ if (descriptors != null) {
+ for (CoreDescriptor descriptor : descriptors) {
+ CloudDescriptor cloudDesc = descriptor.getCloudDescriptor();
+ String leaderUrl;
+ try {
+ leaderUrl = getLeaderProps(cloudDesc.getCollectionName(), cloudDesc.getShardId())
+ .getCoreUrl();
+ } catch (InterruptedException e) {
+ throw new RuntimeException();
+ }
+ String ourUrl = ZkCoreNodeProps.getCoreUrl(getBaseUrl(), descriptor.getName());
+ boolean isLeader = leaderUrl.equals(ourUrl);
+ log.info("SolrCore connected to ZooKeeper - we are " + ourUrl + " and leader is " + leaderUrl);
+ cloudDesc.isLeader = isLeader;
+ }
+ }
+ }
+ });
+
this.overseerJobQueue = Overseer.getInQueue(zkClient);
this.overseerCollectionQueue = Overseer.getCollectionQueue(zkClient);
cmdExecutor = new ZkCmdExecutor();
@@ -531,25 +571,7 @@
throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR, "", e);
}
- // rather than look in the cluster state file, we go straight to the zknodes
- // here, because on cluster restart there could be stale leader info in the
- // cluster state node that won't be updated for a moment
- String leaderUrl = getLeaderProps(collection, cloudDesc.getShardId()).getCoreUrl();
-
- // now wait until our currently cloud state contains the latest leader
- String clusterStateLeader = zkStateReader.getLeaderUrl(collection, shardId, 30000);
- int tries = 0;
- while (!leaderUrl.equals(clusterStateLeader)) {
- if (tries == 60) {
- throw new SolrException(ErrorCode.SERVER_ERROR,
- "There is conflicting information about the leader of shard: "
- + cloudDesc.getShardId() + " our state says:" + clusterStateLeader + " but zookeeper says:" + leaderUrl);
- }
- Thread.sleep(1000);
- tries++;
- clusterStateLeader = zkStateReader.getLeaderUrl(collection, shardId, 30000);
- leaderUrl = getLeaderProps(collection, cloudDesc.getShardId()).getCoreUrl();
- }
+ String leaderUrl = getLeader(cloudDesc);
String ourUrl = ZkCoreNodeProps.getCoreUrl(baseUrl, coreName);
log.info("We are " + ourUrl + " and leader is " + leaderUrl);
@@ -604,6 +626,45 @@
return shardId;
}
+
+ private String getLeader(final CloudDescriptor cloudDesc) {
+
+ String collection = cloudDesc.getCollectionName();
+ String shardId = cloudDesc.getShardId();
+ // rather than look in the cluster state file, we go straight to the zknodes
+ // here, because on cluster restart there could be stale leader info in the
+ // cluster state node that won't be updated for a moment
+ String leaderUrl;
+ try {
+ leaderUrl = getLeaderProps(collection, cloudDesc.getShardId())
+ .getCoreUrl();
+
+ // now wait until our currently cloud state contains the latest leader
+ String clusterStateLeader = zkStateReader.getLeaderUrl(collection,
+ shardId, 30000);
+ int tries = 0;
+ while (!leaderUrl.equals(clusterStateLeader)) {
+ if (tries == 60) {
+ throw new SolrException(ErrorCode.SERVER_ERROR,
+ "There is conflicting information about the leader of shard: "
+ + cloudDesc.getShardId() + " our state says:"
+ + clusterStateLeader + " but zookeeper says:" + leaderUrl);
+ }
+ Thread.sleep(1000);
+ tries++;
+ clusterStateLeader = zkStateReader.getLeaderUrl(collection, shardId,
+ 30000);
+ leaderUrl = getLeaderProps(collection, cloudDesc.getShardId())
+ .getCoreUrl();
+ }
+
+ } catch (Exception e) {
+ log.error("Error getting leader from zk", e);
+ throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
+ "Error getting leader from zk", e);
+ }
+ return leaderUrl;
+ }
/**
* Get leader props directly from zk nodes.
@@ -615,8 +676,9 @@
* @throws InterruptedException
*/
private ZkCoreNodeProps getLeaderProps(final String collection,
- final String slice) throws KeeperException, InterruptedException {
+ final String slice) throws InterruptedException {
int iterCount = 60;
+ Exception exp = null;
while (iterCount-- > 0) {
try {
byte[] data = zkClient.getData(
@@ -625,11 +687,17 @@
ZkCoreNodeProps leaderProps = new ZkCoreNodeProps(
ZkNodeProps.load(data));
return leaderProps;
- } catch (NoNodeException e) {
+ } catch (InterruptedException e) {
+ throw e;
+ } catch (Exception e) {
+ exp = e;
Thread.sleep(500);
}
+ if (cc.isShutDown()) {
+ throw new RuntimeException("CoreContainer is shutdown");
+ }
}
- throw new RuntimeException("Could not get leader props");
+ throw new RuntimeException("Could not get leader props", exp);
}
Index: solr/core/src/java/org/apache/solr/cloud/SyncStrategy.java
===================================================================
--- solr/core/src/java/org/apache/solr/cloud/SyncStrategy.java (revision 1375795)
+++ solr/core/src/java/org/apache/solr/cloud/SyncStrategy.java (working copy)
@@ -26,7 +26,9 @@
import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.impl.HttpClientUtil;
import org.apache.solr.client.solrj.impl.HttpSolrServer;
+import org.apache.solr.client.solrj.request.AbstractUpdateRequest.ACTION;
import org.apache.solr.client.solrj.request.CoreAdminRequest.RequestRecovery;
+import org.apache.solr.client.solrj.request.UpdateRequest;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.cloud.ClusterState;
import org.apache.solr.common.cloud.Slice;
@@ -35,6 +37,7 @@
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.params.CoreAdminParams.CoreAdminAction;
import org.apache.solr.common.params.ModifiableSolrParams;
+import org.apache.solr.common.params.UpdateParams;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.core.SolrCore;
import org.apache.solr.handler.component.HttpShardHandlerFactory;
@@ -224,20 +227,40 @@
requestRecovery(((ShardCoreRequest)srsp.getShardRequest()).baseUrl, ((ShardCoreRequest)srsp.getShardRequest()).coreName);
- } catch (Exception e) {
- SolrException.log(log, ZkCoreNodeProps.getCoreUrl(leaderProps) + ": Could not tell a replica to recover", e);
+ } catch (Throwable t) {
+ SolrException.log(log, ZkCoreNodeProps.getCoreUrl(leaderProps) + ": Could not tell a replica to recover", t);
}
} else {
log.info(ZkCoreNodeProps.getCoreUrl(leaderProps) + ": " + " sync completed with " + srsp.getShardAddress());
}
+
}
-
+// HttpSolrServer server = new HttpSolrServer(ZkCoreNodeProps.getCoreUrl(leaderProps));
+//
+//
+// ModifiableSolrParams params = new ModifiableSolrParams();
+//
+// UpdateRequest ur = new UpdateRequest();
+// ur.setParams(params);
+// params.set( UpdateParams.COMMIT, "true" );
+// params.set( UpdateParams.SOFT_COMMIT, false);
+// params.set(UpdateParams.OPEN_SEARCHER, false);
+// params.set( UpdateParams.WAIT_SEARCHER, false);
+// try {
+// ur.process(server);
+//
+// } catch (SolrServerException e) {
+// throw new RuntimeException();
+// } catch (IOException e) {
+// throw new RuntimeException();
+// }
}
private boolean handleResponse(ShardResponse srsp) {
NamedList<Object> response = srsp.getSolrResponse().getResponse();
// TODO: why does this return null sometimes?
+ System.out.println("resp:" + response);
if (response == null) {
return false;
}
Index: solr/core/src/java/org/apache/solr/cloud/CloudDescriptor.java
===================================================================
--- solr/core/src/java/org/apache/solr/cloud/CloudDescriptor.java (revision 1375795)
+++ solr/core/src/java/org/apache/solr/cloud/CloudDescriptor.java (working copy)
@@ -26,6 +26,12 @@
private String roles = null;
private Integer numShards;
+ volatile boolean isLeader = false;
+
+ public boolean isLeader() {
+ return isLeader;
+ }
+
public void setShardId(String shardId) {
this.shardId = shardId;
}
Index: solr/test-framework/src/java/org/apache/solr/cloud/AbstractFullDistribZkTestBase.java
===================================================================
--- solr/test-framework/src/java/org/apache/solr/cloud/AbstractFullDistribZkTestBase.java (revision 1375795)
+++ solr/test-framework/src/java/org/apache/solr/cloud/AbstractFullDistribZkTestBase.java (working copy)
@@ -801,7 +801,7 @@
SolrDocumentList lst1 = lastJetty.client.solrClient.query(query).getResults();
SolrDocumentList lst2 = cjetty.client.solrClient.query(query).getResults();
- showDiff(lst1, lst2, lastJetty.toString(), cjetty.client.solrClient.toString());
+ showDiff(lst1, lst2, lastJetty.url, cjetty.url);
}
}
@@ -1130,7 +1130,8 @@
try {
commit();
- } catch (Exception e) {
+ } catch (Throwable t) {
+ t.printStackTrace();
// we don't care if this commit fails on some nodes
}
@@ -1146,8 +1147,8 @@
retry = true;
}
cnt++;
- if (cnt > 2) break;
- Thread.sleep(4000);
+ if (cnt > 4) break;
+ Thread.sleep(2000);
} while (retry);
}
Index: solr/test-framework/src/java/org/apache/solr/cloud/ChaosMonkey.java
===================================================================
--- solr/test-framework/src/java/org/apache/solr/cloud/ChaosMonkey.java (revision 1375795)
+++ solr/test-framework/src/java/org/apache/solr/cloud/ChaosMonkey.java (working copy)
@@ -17,7 +17,8 @@
* limitations under the License.
*/
-import java.net.BindException;
+import java.io.IOException;
+import java.net.Socket;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
@@ -181,9 +182,7 @@
}
jetty.stop();
- if (sdf != null) {
- sdf.destroy();
- }
+ stop(jetty);
if (!jetty.isStopped()) {
throw new RuntimeException("could not kill jetty");
@@ -441,6 +440,7 @@
}
public static boolean start(JettySolrRunner jetty) throws Exception {
+
try {
jetty.start();
} catch (Exception e) {
@@ -454,7 +454,7 @@
try {
jetty.start();
} catch (Exception e3) {
- log.error("", e3);
+ log.error("Could not get the port to start jetty again", e3);
// we coud not get the port
jetty.stop();
return false;
@@ -463,5 +463,27 @@
}
return true;
}
+
+ private static boolean checkPort(int port) {
+ boolean running = false;
+ Socket socket = null;
+ try {
+ socket = new Socket((String) null, port);
+ System.out.println("RUNNING");
+ running = true;
+ } catch (IOException e) {
+ // not running
+ System.out.println("NOTRUNNING");
+ e.printStackTrace();
+ } finally {
+ if (socket != null) {
+ try {
+ socket.close();
+ } catch (IOException e) {}
+ }
+ }
+
+ return running;
+ }
}
\ No newline at end of file
Index: solr/solrj/src/java/org/apache/solr/common/cloud/ZkClientConnectionStrategy.java
===================================================================
--- solr/solrj/src/java/org/apache/solr/common/cloud/ZkClientConnectionStrategy.java (revision 1375795)
+++ solr/solrj/src/java/org/apache/solr/common/cloud/ZkClientConnectionStrategy.java (working copy)
@@ -18,18 +18,65 @@
*/
import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
import java.util.concurrent.TimeoutException;
+import org.apache.solr.common.SolrException;
import org.apache.zookeeper.SolrZooKeeper;
import org.apache.zookeeper.Watcher;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
*
*/
public abstract class ZkClientConnectionStrategy {
+ private static Logger log = LoggerFactory.getLogger(ZkClientConnectionStrategy.class);
+
+ private List<DisconnectedListener> disconnectedListeners = new ArrayList<DisconnectedListener>();
+ private List<ConnectedListener> connectedListeners = new ArrayList<ConnectedListener>();
+
public abstract void connect(String zkServerAddress, int zkClientTimeout, Watcher watcher, ZkUpdate updater) throws IOException, InterruptedException, TimeoutException;
public abstract void reconnect(String serverAddress, int zkClientTimeout, Watcher watcher, ZkUpdate updater) throws IOException, InterruptedException, TimeoutException;
+ public synchronized void disconnected() {
+ for (DisconnectedListener listener : disconnectedListeners) {
+ try {
+ listener.disconnected();
+ } catch (Throwable t) {
+ SolrException.log(log, "", t);
+ }
+ }
+ }
+
+ public synchronized void connected() {
+ for (ConnectedListener listener : connectedListeners) {
+ try {
+ listener.connected();
+ } catch (Throwable t) {
+ SolrException.log(log, "", t);
+ }
+ }
+ }
+
+ public interface DisconnectedListener {
+ public void disconnected();
+ };
+
+ public interface ConnectedListener {
+ public void connected();
+ };
+
+
+ public synchronized void addDisconnectedListener(DisconnectedListener listener) {
+ disconnectedListeners.add(listener);
+ }
+
+ public synchronized void addConnectedListener(ConnectedListener listener) {
+ connectedListeners.add(listener);
+ }
+
public static abstract class ZkUpdate {
public abstract void update(SolrZooKeeper zooKeeper) throws InterruptedException, TimeoutException, IOException;
}
Index: solr/solrj/src/java/org/apache/solr/common/cloud/ConnectionManager.java
===================================================================
--- solr/solrj/src/java/org/apache/solr/common/cloud/ConnectionManager.java (revision 1375795)
+++ solr/solrj/src/java/org/apache/solr/common/cloud/ConnectionManager.java (working copy)
@@ -38,6 +38,8 @@
private boolean connected;
private final ZkClientConnectionStrategy connectionStrategy;
+
+ private Object connectionUpdateLock = new Object();
private String zkServerAddress;
@@ -72,6 +74,7 @@
}
if (isClosed) {
+ log.info("Client->ZooKeeper status change trigger but we are already closed");
return;
}
@@ -79,28 +82,25 @@
if (state == KeeperState.SyncConnected) {
connected = true;
clientConnected.countDown();
+ connectionStrategy.connected();
} else if (state == KeeperState.Expired) {
connected = false;
- log.info("Attempting to reconnect to recover relationship with ZooKeeper...");
-
+ log.info("Our previous ZooKeeper session was expired. Attempting to reconnect to recover relationship with ZooKeeper...");
+
try {
connectionStrategy.reconnect(zkServerAddress, zkClientTimeout, this,
new ZkClientConnectionStrategy.ZkUpdate() {
@Override
public void update(SolrZooKeeper keeper) {
// if keeper does not replace oldKeeper we must be sure to close it
- synchronized (connectionStrategy) {
+ synchronized (connectionUpdateLock) {
try {
waitForConnected(SolrZkClient.DEFAULT_CLIENT_CONNECT_TIMEOUT);
- } catch (InterruptedException e1) {
- closeKeeper(keeper);
- Thread.currentThread().interrupt();
- throw new RuntimeException("Giving up on connecting - we were interrupted", e1);
} catch (Exception e1) {
closeKeeper(keeper);
throw new RuntimeException(e1);
}
-
+ log.info("Connection with ZooKeeper reestablished.");
try {
client.updateKeeper(keeper);
} catch (InterruptedException e) {
@@ -129,7 +129,9 @@
}
log.info("Connected:" + connected);
} else if (state == KeeperState.Disconnected) {
+ log.info("zkClient has disconnected");
connected = false;
+ connectionStrategy.disconnected();
} else {
connected = false;
}
@@ -151,19 +153,26 @@
}
public synchronized void waitForConnected(long waitForConnection)
- throws InterruptedException, TimeoutException {
+ throws TimeoutException {
+ log.info("Waiting for client to connect to ZooKeeper");
long expire = System.currentTimeMillis() + waitForConnection;
long left = 1;
while (!connected && left > 0) {
if (isClosed) {
break;
}
- wait(500);
+ try {
+ wait(500);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new RuntimeException(e);
+ }
left = expire - System.currentTimeMillis();
}
if (!connected) {
throw new TimeoutException("Could not connect to ZooKeeper " + zkServerAddress + " within " + waitForConnection + " ms");
}
+ log.info("Client is connected to ZooKeeper");
}
public synchronized void waitForDisconnected(long timeout)
Index: solr/solrj/src/java/org/apache/solr/common/cloud/DefaultConnectionStrategy.java
===================================================================
--- solr/solrj/src/java/org/apache/solr/common/cloud/DefaultConnectionStrategy.java (revision 1375795)
+++ solr/solrj/src/java/org/apache/solr/common/cloud/DefaultConnectionStrategy.java (working copy)
@@ -54,4 +54,5 @@
}
+
}
Index: solr/solrj/src/java/org/apache/solr/common/cloud/SolrZkClient.java
===================================================================
--- solr/solrj/src/java/org/apache/solr/common/cloud/SolrZkClient.java (revision 1375795)
+++ solr/solrj/src/java/org/apache/solr/common/cloud/SolrZkClient.java (working copy)
@@ -74,6 +74,7 @@
private ZkCmdExecutor zkCmdExecutor = new ZkCmdExecutor();
private volatile boolean isClosed = false;
+ private ZkClientConnectionStrategy zkClientConnectionStrategy;
/**
* @param zkServerAddress
@@ -116,6 +117,7 @@
*/
public SolrZkClient(String zkServerAddress, int zkClientTimeout,
ZkClientConnectionStrategy strat, final OnReconnect onReconnect, int clientConnectTimeout) {
+ this.zkClientConnectionStrategy = strat;
connManager = new ConnectionManager("ZooKeeperConnection Watcher:"
+ zkServerAddress, this, zkServerAddress, zkClientTimeout, strat, onReconnect);
try {
@@ -135,29 +137,24 @@
}
}
});
- } catch (IOException e) {
- connManager.close();
- throw new RuntimeException();
- } catch (InterruptedException e) {
- connManager.close();
- throw new RuntimeException();
- } catch (TimeoutException e) {
+ } catch (Throwable e) {
connManager.close();
throw new RuntimeException();
}
+
try {
connManager.waitForConnected(clientConnectTimeout);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- connManager.close();
- throw new RuntimeException();
- } catch (TimeoutException e) {
+ } catch (Throwable e) {
connManager.close();
throw new RuntimeException();
}
numOpens.incrementAndGet();
}
+ public ZkClientConnectionStrategy getZkClientConnectionStrategy() {
+ return zkClientConnectionStrategy;
+ }
+
/**
* @return true if client is connected
*/
Index: solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudSolrServer.java
===================================================================
--- solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudSolrServer.java (revision 1375795)
+++ solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudSolrServer.java (working copy)
@@ -242,14 +242,16 @@
theUrlList.addAll(urlList);
}
Collections.shuffle(theUrlList, rand);
- if (replicas != null) {
+ if (sendToLeaders) {
ArrayList<String> theReplicas = new ArrayList<String>(replicasList.size());
theReplicas.addAll(replicasList);
Collections.shuffle(theReplicas, rand);
-
+ // System.out.println("leaders:" + theUrlList);
+ // System.out.println("replicas:" + theReplicas);
theUrlList.addAll(theReplicas);
}
- //System.out.println("########################## MAKING REQUEST TO " + theUrlList);
+
+ // System.out.println("########################## MAKING REQUEST TO " + theUrlList);
LBHttpSolrServer.Req req = new LBHttpSolrServer.Req(request, theUrlList);
LBHttpSolrServer.Rsp rsp = lbServer.request(req);
Index: solr/solrj/src/java/org/apache/solr/client/solrj/request/CoreAdminRequest.java
===================================================================
--- solr/solrj/src/java/org/apache/solr/client/solrj/request/CoreAdminRequest.java (revision 1375795)
+++ solr/solrj/src/java/org/apache/solr/client/solrj/request/CoreAdminRequest.java (working copy)
@@ -121,7 +121,9 @@
protected String state;
protected Boolean checkLive;
protected Integer pauseFor;
+ protected Boolean onlyIfLeader;
+
public WaitForState() {
action = CoreAdminAction.PREPRECOVERY;
}
@@ -166,6 +168,14 @@
this.pauseFor = pauseFor;
}
+ public boolean isOnlyIfLeader() {
+ return onlyIfLeader;
+ }
+
+ public void setOnlyIfLeader(boolean onlyIfLeader) {
+ this.onlyIfLeader = onlyIfLeader;
+ }
+
@Override
public SolrParams getParams() {
if( action == null ) {
@@ -195,6 +205,10 @@
if (pauseFor != null) {
params.set( "pauseFor", pauseFor);
}
+
+ if (onlyIfLeader != null) {
+ params.set( "onlyIfLeader", onlyIfLeader);
+ }
return params;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment