Created
August 21, 2012 21:09
-
-
Save markrmiller/3419410 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Index: solr/testlogging.properties | |
=================================================================== | |
--- solr/testlogging.properties (revision 1375795) | |
+++ solr/testlogging.properties (working copy) | |
@@ -4,15 +4,22 @@ | |
java.util.logging.ConsoleHandler.formatter=org.apache.solr.SolrLogFormatter | |
-#.level=SEVERE | |
-.level=INFO | |
+.level=SEVERE | |
+#.level=INFO | |
-#org.apache.solr.update.processor.LogUpdateProcessor=FINEST | |
-#org.apache.solr.update.processor.DistributedUpdateProcessor=FINEST | |
+#org.apache.solr.update.processor.LogUpdateProcessor.level=FINEST | |
+#org.apache.solr.update.processor.DistributedUpdateProcessor.level=FINEST | |
+org.apache.solr.update.DefaultSolrCoreState.level=FINEST | |
+org.apache.solr.cloud.ZkController.level=FINEST | |
+org.apache.solr.cloud.ShardLeaderElectionContext.level=FINEST | |
#org.apache.solr.update.PeerSync.level=FINEST | |
-#org.apache.solr.cloud.RecoveryStrategy.level=FINEST | |
-#org.apache.solr.cloud.SyncStrategy.level=FINEST | |
-#org.apache.solr.update.DefaultSolrCoreState.level=FINEST | |
+org.apache.solr.cloud.RecoveryStrategy.level=FINEST | |
+org.apache.solr.cloud.SyncStrategy.level=FINEST | |
+org.apache.solr.handler.admin.CoreAdminHandler.level=FINEST | |
+org.apache.solr.common.cloud.ConnectionManager.level=FINEST | |
+org.apache.solr.update.SolrCmdDistributor.level=FINEST | |
#org.apache.solr.update.UpdateLog.level=FINE | |
#org.apache.solr.update.TransactionLog.level=FINEST | |
+org.apache.solr.cloud.ChaosMonkey.level=FINEST | |
+ | |
Index: solr/core/src/test/org/apache/solr/cloud/ChaosMonkeyNothingIsSafeTest.java | |
=================================================================== | |
--- solr/core/src/test/org/apache/solr/cloud/ChaosMonkeyNothingIsSafeTest.java (revision 1375795) | |
+++ solr/core/src/test/org/apache/solr/cloud/ChaosMonkeyNothingIsSafeTest.java (working copy) | |
@@ -38,11 +38,11 @@ | |
import org.slf4j.LoggerFactory; | |
@Slow | |
-@Ignore("ignore while investigating jenkins fails") | |
+//@Ignore("ignore while investigating jenkins fails") | |
public class ChaosMonkeyNothingIsSafeTest extends AbstractFullDistribZkTestBase { | |
public static Logger log = LoggerFactory.getLogger(ChaosMonkeyNothingIsSafeTest.class); | |
- private static final int BASE_RUN_LENGTH = 45000; | |
+ private static final int BASE_RUN_LENGTH = 8000; | |
@BeforeClass | |
public static void beforeSuperClass() { | |
@@ -56,8 +56,8 @@ | |
@Override | |
public void setUp() throws Exception { | |
super.setUp(); | |
- // TODO use @Noisy annotation as we expect lots of exceptions | |
- //ignoreException(".*"); | |
+ // can help to hide this when testing and looking at logs | |
+ //ignoreException("shard update error"); | |
System.setProperty("numShards", Integer.toString(sliceCount)); | |
} | |
@@ -71,8 +71,8 @@ | |
public ChaosMonkeyNothingIsSafeTest() { | |
super(); | |
- sliceCount = 3; | |
- shardCount = 12; | |
+ sliceCount = 1; | |
+ shardCount = 5; | |
} | |
@Override | |
@@ -83,9 +83,16 @@ | |
handle.put("QTime", SKIPVAL); | |
handle.put("timestamp", SKIPVAL); | |
+ // make sure we have leaders for each shard | |
+ for (int j = 1; j < sliceCount; j++) { | |
+ zkStateReader.getLeaderProps(DEFAULT_COLLECTION, "shard" + j, 10000); | |
+ } // make sure we again have leaders for each shard | |
+ | |
+ waitForRecoveriesToFinish(false); | |
+ | |
// we cannot do delete by query | |
// as it's not supported for recovery | |
- // del("*:*"); | |
+ del("*:*"); | |
List<StopableThread> threads = new ArrayList<StopableThread>(); | |
int threadCount = 1; | |
@@ -152,6 +159,7 @@ | |
zkStateReader.updateClusterState(true); | |
assertTrue(zkStateReader.getClusterState().getLiveNodes().size() > 0); | |
+ | |
// we dont't current check vs control because the full throttle thread can | |
// have request fails | |
checkShardConsistency(false, true); | |
Index: solr/core/src/test/org/apache/solr/cloud/ChaosMonkeySafeLeaderTest.java | |
=================================================================== | |
--- solr/core/src/test/org/apache/solr/cloud/ChaosMonkeySafeLeaderTest.java (revision 1375795) | |
+++ solr/core/src/test/org/apache/solr/cloud/ChaosMonkeySafeLeaderTest.java (working copy) | |
@@ -50,11 +50,6 @@ | |
@Override | |
public void setUp() throws Exception { | |
super.setUp(); | |
- // we expect this time of exception as shards go up and down... | |
- //ignoreException(".*"); | |
- | |
- // sometimes we cannot get the same port | |
- ignoreException("java\\.net\\.BindException: Address already in use"); | |
System.setProperty("numShards", Integer.toString(sliceCount)); | |
} | |
Index: solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java | |
=================================================================== | |
--- solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java (revision 1375795) | |
+++ solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java (working copy) | |
@@ -183,15 +183,9 @@ | |
// set num nodes | |
numNodes = zkController.getClusterState().getLiveNodes().size(); | |
- // the leader is... | |
- // TODO: if there is no leader, wait and look again | |
- // TODO: we are reading the leader from zk every time - we should cache | |
- // this and watch for changes?? Just pull it from ZkController cluster state probably? | |
String shardId = getShard(hash, collection, zkController.getClusterState()); // get the right shard based on the hash... | |
try { | |
- // TODO: if we find out we cannot talk to zk anymore, we should probably realize we are not | |
- // a leader anymore - we shouldn't accept updates at all?? | |
ZkCoreNodeProps leaderProps = new ZkCoreNodeProps(zkController.getZkStateReader().getLeaderProps( | |
collection, shardId)); | |
@@ -201,7 +195,21 @@ | |
isLeader = coreNodeName.equals(leaderNodeName); | |
DistribPhase phase = | |
- DistribPhase.parseParam(req.getParams().get(DISTRIB_UPDATE_PARAM)); | |
+ DistribPhase.parseParam(req.getParams().get(DISTRIB_UPDATE_PARAM)); | |
+ | |
+ // defensive checks | |
+ boolean localIsLeader = req.getCore().getCoreDescriptor().getCloudDescriptor().isLeader(); | |
+ if (DistribPhase.FROMLEADER == phase && localIsLeader) { | |
+ log.error("Request says it is coming from leader, but we are the leader: " + req.getParamString()); | |
+ throw new SolrException(ErrorCode.BAD_REQUEST, "Request says it is coming from leader, but we are the leader"); | |
+ } | |
+ | |
+ if (isLeader && !localIsLeader) { | |
+ log.error("Our cloud state says we are the leader, but locally we don't think so :" + req.getParamString()); | |
+ new SolrException(ErrorCode.BAD_REQUEST, "Our cloud state says we are the leader, but locally we don't think so"); | |
+ } | |
+ // done defensive checks | |
+ | |
if (DistribPhase.FROMLEADER == phase) { | |
// we are coming from the leader, just go local - add no urls | |
@@ -329,6 +337,8 @@ | |
DistribPhase.FROMLEADER.toString() : | |
DistribPhase.TOLEADER.toString())); | |
params.remove("commit"); // this will be distributed from the local commit | |
+ params.set("distrib.update", "commit from " + ZkCoreNodeProps.getCoreUrl( | |
+ zkController.getBaseUrl(), req.getCore().getName())); | |
cmdDistrib.distribAdd(cmd, nodes, params); | |
} | |
@@ -378,9 +388,11 @@ | |
// TODO: we should do this in the background it would seem | |
for (SolrCmdDistributor.Error error : response.errors) { | |
- if (error.node instanceof RetryNode) { | |
+ if (error.node instanceof RetryNode || error.e instanceof SolrException) { | |
// we don't try to force a leader to recover | |
// when we cannot forward to it | |
+ // and we assume SolrException means | |
+ // the node went down | |
continue; | |
} | |
// TODO: we should force their state to recovering ?? | |
@@ -819,6 +831,8 @@ | |
ModifiableSolrParams params = new ModifiableSolrParams(req.getParams()); | |
params.set(VERSION_FIELD, Long.toString(cmd.getVersion())); | |
params.set(DISTRIB_UPDATE_PARAM, DistribPhase.FROMLEADER.toString()); | |
+ params.set("update.from", ZkCoreNodeProps.getCoreUrl( | |
+ zkController.getBaseUrl(), req.getCore().getName())); | |
cmdDistrib.distribDelete(cmd, replicas, params); | |
cmdDistrib.finish(); | |
} | |
@@ -949,15 +963,17 @@ | |
if (zkEnabled) { | |
zkCheck(); | |
} | |
- | |
+ System.out.println("COMMIT"); | |
if (vinfo != null) { | |
vinfo.lockForUpdate(); | |
} | |
try { | |
if (ulog == null || ulog.getState() == UpdateLog.State.ACTIVE || (cmd.getFlags() & UpdateCommand.REPLAY) != 0) { | |
+ System.out.println("LOCALCOMMIT"); | |
super.processCommit(cmd); | |
} else { | |
+ System.out.println("IGNORING"); | |
log.info("Ignoring commit while not ACTIVE - state: " + ulog.getState() + " replay:" + (cmd.getFlags() & UpdateCommand.REPLAY)); | |
} | |
Index: solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java | |
=================================================================== | |
--- solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java (revision 1375795) | |
+++ solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java (working copy) | |
@@ -166,6 +166,8 @@ | |
addCommit(ureq, cmd); | |
+ log.info("Distrib commit to:" + nodes); | |
+ | |
for (Node node : nodes) { | |
submit(ureq, node); | |
} | |
@@ -345,7 +347,8 @@ | |
try { | |
semaphore.acquire(); | |
} catch (InterruptedException e) { | |
- throw new RuntimeException(); | |
+ Thread.currentThread().interrupt(); | |
+ throw new RuntimeException("Update thread interrupted"); | |
} | |
pending.add(completionService.submit(task)); | |
Index: solr/core/src/java/org/apache/solr/client/solrj/embedded/JettySolrRunner.java | |
=================================================================== | |
--- solr/core/src/java/org/apache/solr/client/solrj/embedded/JettySolrRunner.java (revision 1375795) | |
+++ solr/core/src/java/org/apache/solr/client/solrj/embedded/JettySolrRunner.java (working copy) | |
@@ -25,6 +25,7 @@ | |
import java.util.Random; | |
import javax.servlet.DispatcherType; | |
+import javax.servlet.Filter; | |
import javax.servlet.http.HttpServlet; | |
import javax.servlet.http.HttpServletRequest; | |
import javax.servlet.http.HttpServletResponse; | |
@@ -40,6 +41,7 @@ | |
import org.eclipse.jetty.util.component.LifeCycle; | |
import org.eclipse.jetty.util.log.Logger; | |
import org.eclipse.jetty.util.thread.QueuedThreadPool; | |
+import org.eclipse.jetty.util.thread.ThreadPool; | |
/** | |
* Run solr using jetty | |
@@ -207,8 +209,8 @@ | |
} | |
if (!server.isRunning()) { | |
- server.start(); | |
RUNNING_JETTIES.put(this, new RuntimeException()); | |
+ server.start(); | |
} | |
synchronized (JettySolrRunner.this) { | |
int cnt = 0; | |
@@ -225,10 +227,32 @@ | |
} | |
public void stop() throws Exception { | |
- if (!server.isStopped() && !server.isStopping()) { | |
- server.stop(); | |
- RUNNING_JETTIES.remove(this); | |
+ // we try and do a bunch of extra stop stuff because | |
+ // jetty doesn't like to stop if it started | |
+ // and ended up in a failure state (like when it cannot get the port) | |
+ if (server.getState().equals(Server.FAILED)) { | |
+ Connector[] connectors = server.getConnectors(); | |
+ for (Connector connector : connectors) { | |
+ connector.stop(); | |
+ } | |
} | |
+ Filter filter = dispatchFilter.getFilter(); | |
+ ThreadPool threadPool = server.getThreadPool(); | |
+ server.getServer().stop(); | |
+ server.stop(); | |
+ if (threadPool instanceof QueuedThreadPool) { | |
+ ((QueuedThreadPool) threadPool).setMaxStopTimeMs(15000); | |
+ ((QueuedThreadPool) threadPool).stop(); | |
+ ((QueuedThreadPool) threadPool).stop(); | |
+ ((QueuedThreadPool) threadPool).stop(); | |
+ } | |
+ //server.destroy(); | |
+ if (server.getState().equals(Server.FAILED)) { | |
+ filter.destroy(); | |
+ } | |
+ | |
+ RUNNING_JETTIES.remove(this); | |
+ | |
server.join(); | |
} | |
Index: solr/core/src/java/org/apache/solr/handler/component/RealTimeGetComponent.java | |
=================================================================== | |
--- solr/core/src/java/org/apache/solr/handler/component/RealTimeGetComponent.java (revision 1375795) | |
+++ solr/core/src/java/org/apache/solr/handler/component/RealTimeGetComponent.java (working copy) | |
@@ -20,6 +20,7 @@ | |
import org.apache.lucene.document.Document; | |
import org.apache.lucene.index.IndexableField; | |
import org.apache.lucene.index.Term; | |
+import org.apache.lucene.search.MatchAllDocsQuery; | |
import org.apache.lucene.util.BytesRef; | |
import org.apache.solr.client.solrj.SolrResponse; | |
import org.apache.solr.client.solrj.util.ClientUtils; | |
@@ -47,6 +48,7 @@ | |
import org.apache.solr.schema.SchemaField; | |
import org.apache.solr.search.ReturnFields; | |
import org.apache.solr.search.SolrIndexSearcher; | |
+import org.apache.solr.update.CommitUpdateCommand; | |
import org.apache.solr.update.DocumentBuilder; | |
import org.apache.solr.update.PeerSync; | |
import org.apache.solr.update.UpdateLog; | |
@@ -507,6 +509,24 @@ | |
String sync = params.get("sync"); | |
if (sync != null) { | |
processSync(rb, nVersions, sync); | |
+// try { | |
+// RefCounted<SolrIndexSearcher> searchHolder = | |
+// rb.req.getCore().getNewestSearcher(false); | |
+// SolrIndexSearcher searcher = searchHolder.get(); | |
+// try { | |
+// CommitUpdateCommand cuc = new CommitUpdateCommand(req, false); | |
+// //cuc.softCommit = true; | |
+// cuc.waitSearcher = true; | |
+// rb.req.getCore().getUpdateHandler().commit(cuc); | |
+// System.out.println(rb.req.getCore().getCoreDescriptor().getCoreContainer().getZkController().getNodeName() | |
+// + " synched " | |
+// + searcher.search(new MatchAllDocsQuery(), 1).totalHits); | |
+// } finally { | |
+// searchHolder.decref(); | |
+// } | |
+// } catch (Exception e) { | |
+// | |
+// } | |
return; | |
} | |
Index: solr/core/src/java/org/apache/solr/handler/admin/CoreAdminHandler.java | |
=================================================================== | |
--- solr/core/src/java/org/apache/solr/handler/admin/CoreAdminHandler.java (revision 1375795) | |
+++ solr/core/src/java/org/apache/solr/handler/admin/CoreAdminHandler.java (working copy) | |
@@ -28,6 +28,7 @@ | |
import org.apache.commons.io.FileUtils; | |
import org.apache.lucene.index.DirectoryReader; | |
+import org.apache.lucene.search.MatchAllDocsQuery; | |
import org.apache.lucene.store.Directory; | |
import org.apache.lucene.util.IOUtils; | |
import org.apache.solr.cloud.CloudDescriptor; | |
@@ -698,7 +699,7 @@ | |
protected void handleRequestSyncAction(SolrQueryRequest req, | |
SolrQueryResponse rsp) throws IOException { | |
final SolrParams params = req.getParams(); | |
- | |
+ System.out.println("WENEEDOTSYNC"); | |
log.info("I have been requested to sync up my shard"); | |
ZkController zkController = coreContainer.getZkController(); | |
if (zkController == null) { | |
@@ -721,6 +722,21 @@ | |
props.put(ZkStateReader.NODE_NAME_PROP, zkController.getNodeName()); | |
boolean success = syncStrategy.sync(zkController, core, new ZkNodeProps(props)); | |
+ // solrcloud_debug | |
+// try { | |
+// RefCounted<SolrIndexSearcher> searchHolder = | |
+// core.getNewestSearcher(false); | |
+// SolrIndexSearcher searcher = searchHolder.get(); | |
+// try { | |
+// System.out.println(core.getCoreDescriptor().getCoreContainer().getZkController().getNodeName() | |
+// + " synched " | |
+// + searcher.search(new MatchAllDocsQuery(), 1).totalHits); | |
+// } finally { | |
+// searchHolder.decref(); | |
+// } | |
+// } catch (Exception e) { | |
+// | |
+// } | |
if (!success) { | |
throw new SolrException(ErrorCode.SERVER_ERROR, "Sync Failed"); | |
} | |
@@ -750,8 +766,11 @@ | |
String coreNodeName = params.get("coreNodeName"); | |
String waitForState = params.get("state"); | |
Boolean checkLive = params.getBool("checkLive"); | |
+ Boolean onlyIfLeader = params.getBool("onlyIfLeader"); | |
int pauseFor = params.getInt("pauseFor", 0); | |
+ | |
+ | |
String state = null; | |
boolean live = false; | |
int retry = 0; | |
@@ -764,6 +783,12 @@ | |
+ cname); | |
} | |
if (core != null) { | |
+ if (onlyIfLeader != null && onlyIfLeader) { | |
+ if (!core.getCoreDescriptor().getCloudDescriptor().isLeader()) { | |
+ throw new SolrException(ErrorCode.BAD_REQUEST, "We are not the leader"); | |
+ } | |
+ } | |
+ | |
// wait until we are sure the recovering node is ready | |
// to accept updates | |
CloudDescriptor cloudDescriptor = core.getCoreDescriptor() | |
Index: solr/core/src/java/org/apache/solr/core/CoreContainer.java | |
=================================================================== | |
--- solr/core/src/java/org/apache/solr/core/CoreContainer.java (revision 1375795) | |
+++ solr/core/src/java/org/apache/solr/core/CoreContainer.java (working copy) | |
@@ -598,15 +598,16 @@ | |
} | |
cores.clear(); | |
} finally { | |
+ if (shardHandlerFactory != null) { | |
+ shardHandlerFactory.close(); | |
+ } | |
+ // we want to close zk stuff last | |
if(zkController != null) { | |
zkController.close(); | |
} | |
if (zkServer != null) { | |
zkServer.stop(); | |
} | |
- if (shardHandlerFactory != null) { | |
- shardHandlerFactory.close(); | |
- } | |
} | |
} | |
} | |
Index: solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java | |
=================================================================== | |
--- solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java (revision 1375795) | |
+++ solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java (working copy) | |
@@ -185,6 +185,7 @@ | |
prepCmd.setCoreNodeName(coreZkNodeName); | |
prepCmd.setState(ZkStateReader.RECOVERING); | |
prepCmd.setCheckLive(true); | |
+ prepCmd.setOnlyIfLeader(true); | |
prepCmd.setPauseFor(6000); | |
server.request(prepCmd); | |
@@ -305,7 +306,10 @@ | |
String ourUrl = ZkCoreNodeProps.getCoreUrl(baseUrl, coreName); | |
boolean isLeader = leaderUrl.equals(ourUrl); | |
- if (isLeader) { | |
+ if (isLeader && !cloudDesc.isLeader) { | |
+ throw new SolrException(ErrorCode.SERVER_ERROR, "Cloud state still says we are leader."); | |
+ } | |
+ if (cloudDesc.isLeader) { | |
// we are now the leader - no one else must have been suitable | |
log.warn("We have not yet recovered - but we are now the leader! core=" + coreName); | |
log.info("Finished recovery process. core=" + coreName); | |
@@ -333,9 +337,6 @@ | |
new ModifiableSolrParams()); | |
core.getUpdateHandler().commit(new CommitUpdateCommand(req, false)); | |
log.info("PeerSync Recovery was successful - registering as Active. core=" + coreName); | |
- // System.out | |
- // .println("Sync Recovery was successful - registering as Active " | |
- // + zkController.getNodeName()); | |
// solrcloud_debug | |
// try { | |
Index: solr/core/src/java/org/apache/solr/cloud/ElectionContext.java | |
=================================================================== | |
--- solr/core/src/java/org/apache/solr/cloud/ElectionContext.java (revision 1375795) | |
+++ solr/core/src/java/org/apache/solr/cloud/ElectionContext.java (working copy) | |
@@ -3,6 +3,7 @@ | |
import java.io.IOException; | |
import java.util.Map; | |
+import org.apache.lucene.search.MatchAllDocsQuery; | |
import org.apache.solr.common.SolrException; | |
import org.apache.solr.common.SolrException.ErrorCode; | |
import org.apache.solr.common.cloud.ClusterState; | |
@@ -13,6 +14,8 @@ | |
import org.apache.solr.common.cloud.ZkStateReader; | |
import org.apache.solr.core.CoreContainer; | |
import org.apache.solr.core.SolrCore; | |
+import org.apache.solr.search.SolrIndexSearcher; | |
+import org.apache.solr.util.RefCounted; | |
import org.apache.zookeeper.CreateMode; | |
import org.apache.zookeeper.KeeperException; | |
import org.apache.zookeeper.KeeperException.NodeExistsException; | |
@@ -157,13 +160,28 @@ | |
// first cancel any current recovery | |
core.getUpdateHandler().getSolrCoreState().cancelRecovery(); | |
boolean success = syncStrategy.sync(zkController, core, leaderProps); | |
+ // solrcloud_debug | |
+// try { | |
+// RefCounted<SolrIndexSearcher> searchHolder = | |
+// core.getNewestSearcher(false); | |
+// SolrIndexSearcher searcher = searchHolder.get(); | |
+// try { | |
+// System.out.println(core.getCoreDescriptor().getCoreContainer().getZkController().getNodeName() | |
+// + " synched " | |
+// + searcher.search(new MatchAllDocsQuery(), 1).totalHits); | |
+// } finally { | |
+// searchHolder.decref(); | |
+// } | |
+// } catch (Exception e) { | |
+// | |
+// } | |
if (!success && anyoneElseActive()) { | |
rejoinLeaderElection(leaderSeqPath, core); | |
return; | |
} | |
} | |
log.info("I am the new leader: " + ZkCoreNodeProps.getCoreUrl(leaderProps)); | |
- | |
+ core.getCoreDescriptor().getCloudDescriptor().isLeader = true; | |
// If I am going to be the leader I have to be active | |
core.getUpdateHandler().getSolrCoreState().cancelRecovery(); | |
zkController.publish(core.getCoreDescriptor(), ZkStateReader.ACTIVE); | |
Index: solr/core/src/java/org/apache/solr/cloud/ZkController.java | |
=================================================================== | |
--- solr/core/src/java/org/apache/solr/cloud/ZkController.java (revision 1375795) | |
+++ solr/core/src/java/org/apache/solr/cloud/ZkController.java (working copy) | |
@@ -41,6 +41,7 @@ | |
import org.apache.solr.common.cloud.ClusterState; | |
import org.apache.solr.common.cloud.OnReconnect; | |
import org.apache.solr.common.cloud.SolrZkClient; | |
+import org.apache.solr.common.cloud.ZkClientConnectionStrategy; | |
import org.apache.solr.common.cloud.ZkCmdExecutor; | |
import org.apache.solr.common.cloud.ZkCoreNodeProps; | |
import org.apache.solr.common.cloud.ZkNodeProps; | |
@@ -209,6 +210,45 @@ | |
}); | |
+ | |
+ zkClient.getZkClientConnectionStrategy().addDisconnectedListener(new ZkClientConnectionStrategy.DisconnectedListener() { | |
+ | |
+ @Override | |
+ public void disconnected() { | |
+ List<CoreDescriptor> descriptors = registerOnReconnect.getCurrentDescriptors(); | |
+ // re register all descriptors | |
+ if (descriptors != null) { | |
+ for (CoreDescriptor descriptor : descriptors) { | |
+ descriptor.getCloudDescriptor().isLeader = false; | |
+ } | |
+ } | |
+ } | |
+ }); | |
+ | |
+ zkClient.getZkClientConnectionStrategy().addConnectedListener(new ZkClientConnectionStrategy.ConnectedListener() { | |
+ | |
+ @Override | |
+ public void connected() { | |
+ List<CoreDescriptor> descriptors = registerOnReconnect.getCurrentDescriptors(); | |
+ if (descriptors != null) { | |
+ for (CoreDescriptor descriptor : descriptors) { | |
+ CloudDescriptor cloudDesc = descriptor.getCloudDescriptor(); | |
+ String leaderUrl; | |
+ try { | |
+ leaderUrl = getLeaderProps(cloudDesc.getCollectionName(), cloudDesc.getShardId()) | |
+ .getCoreUrl(); | |
+ } catch (InterruptedException e) { | |
+ throw new RuntimeException(); | |
+ } | |
+ String ourUrl = ZkCoreNodeProps.getCoreUrl(getBaseUrl(), descriptor.getName()); | |
+ boolean isLeader = leaderUrl.equals(ourUrl); | |
+ log.info("SolrCore connected to ZooKeeper - we are " + ourUrl + " and leader is " + leaderUrl); | |
+ cloudDesc.isLeader = isLeader; | |
+ } | |
+ } | |
+ } | |
+ }); | |
+ | |
this.overseerJobQueue = Overseer.getInQueue(zkClient); | |
this.overseerCollectionQueue = Overseer.getCollectionQueue(zkClient); | |
cmdExecutor = new ZkCmdExecutor(); | |
@@ -531,25 +571,7 @@ | |
throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR, "", e); | |
} | |
- // rather than look in the cluster state file, we go straight to the zknodes | |
- // here, because on cluster restart there could be stale leader info in the | |
- // cluster state node that won't be updated for a moment | |
- String leaderUrl = getLeaderProps(collection, cloudDesc.getShardId()).getCoreUrl(); | |
- | |
- // now wait until our currently cloud state contains the latest leader | |
- String clusterStateLeader = zkStateReader.getLeaderUrl(collection, shardId, 30000); | |
- int tries = 0; | |
- while (!leaderUrl.equals(clusterStateLeader)) { | |
- if (tries == 60) { | |
- throw new SolrException(ErrorCode.SERVER_ERROR, | |
- "There is conflicting information about the leader of shard: " | |
- + cloudDesc.getShardId() + " our state says:" + clusterStateLeader + " but zookeeper says:" + leaderUrl); | |
- } | |
- Thread.sleep(1000); | |
- tries++; | |
- clusterStateLeader = zkStateReader.getLeaderUrl(collection, shardId, 30000); | |
- leaderUrl = getLeaderProps(collection, cloudDesc.getShardId()).getCoreUrl(); | |
- } | |
+ String leaderUrl = getLeader(cloudDesc); | |
String ourUrl = ZkCoreNodeProps.getCoreUrl(baseUrl, coreName); | |
log.info("We are " + ourUrl + " and leader is " + leaderUrl); | |
@@ -604,6 +626,45 @@ | |
return shardId; | |
} | |
+ | |
+ private String getLeader(final CloudDescriptor cloudDesc) { | |
+ | |
+ String collection = cloudDesc.getCollectionName(); | |
+ String shardId = cloudDesc.getShardId(); | |
+ // rather than look in the cluster state file, we go straight to the zknodes | |
+ // here, because on cluster restart there could be stale leader info in the | |
+ // cluster state node that won't be updated for a moment | |
+ String leaderUrl; | |
+ try { | |
+ leaderUrl = getLeaderProps(collection, cloudDesc.getShardId()) | |
+ .getCoreUrl(); | |
+ | |
+ // now wait until our currently cloud state contains the latest leader | |
+ String clusterStateLeader = zkStateReader.getLeaderUrl(collection, | |
+ shardId, 30000); | |
+ int tries = 0; | |
+ while (!leaderUrl.equals(clusterStateLeader)) { | |
+ if (tries == 60) { | |
+ throw new SolrException(ErrorCode.SERVER_ERROR, | |
+ "There is conflicting information about the leader of shard: " | |
+ + cloudDesc.getShardId() + " our state says:" | |
+ + clusterStateLeader + " but zookeeper says:" + leaderUrl); | |
+ } | |
+ Thread.sleep(1000); | |
+ tries++; | |
+ clusterStateLeader = zkStateReader.getLeaderUrl(collection, shardId, | |
+ 30000); | |
+ leaderUrl = getLeaderProps(collection, cloudDesc.getShardId()) | |
+ .getCoreUrl(); | |
+ } | |
+ | |
+ } catch (Exception e) { | |
+ log.error("Error getting leader from zk", e); | |
+ throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, | |
+ "Error getting leader from zk", e); | |
+ } | |
+ return leaderUrl; | |
+ } | |
/** | |
* Get leader props directly from zk nodes. | |
@@ -615,8 +676,9 @@ | |
* @throws InterruptedException | |
*/ | |
private ZkCoreNodeProps getLeaderProps(final String collection, | |
- final String slice) throws KeeperException, InterruptedException { | |
+ final String slice) throws InterruptedException { | |
int iterCount = 60; | |
+ Exception exp = null; | |
while (iterCount-- > 0) { | |
try { | |
byte[] data = zkClient.getData( | |
@@ -625,11 +687,17 @@ | |
ZkCoreNodeProps leaderProps = new ZkCoreNodeProps( | |
ZkNodeProps.load(data)); | |
return leaderProps; | |
- } catch (NoNodeException e) { | |
+ } catch (InterruptedException e) { | |
+ throw e; | |
+ } catch (Exception e) { | |
+ exp = e; | |
Thread.sleep(500); | |
} | |
+ if (cc.isShutDown()) { | |
+ throw new RuntimeException("CoreContainer is shutdown"); | |
+ } | |
} | |
- throw new RuntimeException("Could not get leader props"); | |
+ throw new RuntimeException("Could not get leader props", exp); | |
} | |
Index: solr/core/src/java/org/apache/solr/cloud/SyncStrategy.java | |
=================================================================== | |
--- solr/core/src/java/org/apache/solr/cloud/SyncStrategy.java (revision 1375795) | |
+++ solr/core/src/java/org/apache/solr/cloud/SyncStrategy.java (working copy) | |
@@ -26,7 +26,9 @@ | |
import org.apache.solr.client.solrj.SolrServerException; | |
import org.apache.solr.client.solrj.impl.HttpClientUtil; | |
import org.apache.solr.client.solrj.impl.HttpSolrServer; | |
+import org.apache.solr.client.solrj.request.AbstractUpdateRequest.ACTION; | |
import org.apache.solr.client.solrj.request.CoreAdminRequest.RequestRecovery; | |
+import org.apache.solr.client.solrj.request.UpdateRequest; | |
import org.apache.solr.common.SolrException; | |
import org.apache.solr.common.cloud.ClusterState; | |
import org.apache.solr.common.cloud.Slice; | |
@@ -35,6 +37,7 @@ | |
import org.apache.solr.common.cloud.ZkStateReader; | |
import org.apache.solr.common.params.CoreAdminParams.CoreAdminAction; | |
import org.apache.solr.common.params.ModifiableSolrParams; | |
+import org.apache.solr.common.params.UpdateParams; | |
import org.apache.solr.common.util.NamedList; | |
import org.apache.solr.core.SolrCore; | |
import org.apache.solr.handler.component.HttpShardHandlerFactory; | |
@@ -224,20 +227,40 @@ | |
requestRecovery(((ShardCoreRequest)srsp.getShardRequest()).baseUrl, ((ShardCoreRequest)srsp.getShardRequest()).coreName); | |
- } catch (Exception e) { | |
- SolrException.log(log, ZkCoreNodeProps.getCoreUrl(leaderProps) + ": Could not tell a replica to recover", e); | |
+ } catch (Throwable t) { | |
+ SolrException.log(log, ZkCoreNodeProps.getCoreUrl(leaderProps) + ": Could not tell a replica to recover", t); | |
} | |
} else { | |
log.info(ZkCoreNodeProps.getCoreUrl(leaderProps) + ": " + " sync completed with " + srsp.getShardAddress()); | |
} | |
+ | |
} | |
- | |
+// HttpSolrServer server = new HttpSolrServer(ZkCoreNodeProps.getCoreUrl(leaderProps)); | |
+// | |
+// | |
+// ModifiableSolrParams params = new ModifiableSolrParams(); | |
+// | |
+// UpdateRequest ur = new UpdateRequest(); | |
+// ur.setParams(params); | |
+// params.set( UpdateParams.COMMIT, "true" ); | |
+// params.set( UpdateParams.SOFT_COMMIT, false); | |
+// params.set(UpdateParams.OPEN_SEARCHER, false); | |
+// params.set( UpdateParams.WAIT_SEARCHER, false); | |
+// try { | |
+// ur.process(server); | |
+// | |
+// } catch (SolrServerException e) { | |
+// throw new RuntimeException(); | |
+// } catch (IOException e) { | |
+// throw new RuntimeException(); | |
+// } | |
} | |
private boolean handleResponse(ShardResponse srsp) { | |
NamedList<Object> response = srsp.getSolrResponse().getResponse(); | |
// TODO: why does this return null sometimes? | |
+ System.out.println("resp:" + response); | |
if (response == null) { | |
return false; | |
} | |
Index: solr/core/src/java/org/apache/solr/cloud/CloudDescriptor.java | |
=================================================================== | |
--- solr/core/src/java/org/apache/solr/cloud/CloudDescriptor.java (revision 1375795) | |
+++ solr/core/src/java/org/apache/solr/cloud/CloudDescriptor.java (working copy) | |
@@ -26,6 +26,12 @@ | |
private String roles = null; | |
private Integer numShards; | |
+ volatile boolean isLeader = false; | |
+ | |
+ public boolean isLeader() { | |
+ return isLeader; | |
+ } | |
+ | |
public void setShardId(String shardId) { | |
this.shardId = shardId; | |
} | |
Index: solr/test-framework/src/java/org/apache/solr/cloud/AbstractFullDistribZkTestBase.java | |
=================================================================== | |
--- solr/test-framework/src/java/org/apache/solr/cloud/AbstractFullDistribZkTestBase.java (revision 1375795) | |
+++ solr/test-framework/src/java/org/apache/solr/cloud/AbstractFullDistribZkTestBase.java (working copy) | |
@@ -801,7 +801,7 @@ | |
SolrDocumentList lst1 = lastJetty.client.solrClient.query(query).getResults(); | |
SolrDocumentList lst2 = cjetty.client.solrClient.query(query).getResults(); | |
- showDiff(lst1, lst2, lastJetty.toString(), cjetty.client.solrClient.toString()); | |
+ showDiff(lst1, lst2, lastJetty.url, cjetty.url); | |
} | |
} | |
@@ -1130,7 +1130,8 @@ | |
try { | |
commit(); | |
- } catch (Exception e) { | |
+ } catch (Throwable t) { | |
+ t.printStackTrace(); | |
// we don't care if this commit fails on some nodes | |
} | |
@@ -1146,8 +1147,8 @@ | |
retry = true; | |
} | |
cnt++; | |
- if (cnt > 2) break; | |
- Thread.sleep(4000); | |
+ if (cnt > 4) break; | |
+ Thread.sleep(2000); | |
} while (retry); | |
} | |
Index: solr/test-framework/src/java/org/apache/solr/cloud/ChaosMonkey.java | |
=================================================================== | |
--- solr/test-framework/src/java/org/apache/solr/cloud/ChaosMonkey.java (revision 1375795) | |
+++ solr/test-framework/src/java/org/apache/solr/cloud/ChaosMonkey.java (working copy) | |
@@ -17,7 +17,8 @@ | |
* limitations under the License. | |
*/ | |
-import java.net.BindException; | |
+import java.io.IOException; | |
+import java.net.Socket; | |
import java.util.ArrayList; | |
import java.util.List; | |
import java.util.Map; | |
@@ -181,9 +182,7 @@ | |
} | |
jetty.stop(); | |
- if (sdf != null) { | |
- sdf.destroy(); | |
- } | |
+ stop(jetty); | |
if (!jetty.isStopped()) { | |
throw new RuntimeException("could not kill jetty"); | |
@@ -441,6 +440,7 @@ | |
} | |
public static boolean start(JettySolrRunner jetty) throws Exception { | |
+ | |
try { | |
jetty.start(); | |
} catch (Exception e) { | |
@@ -454,7 +454,7 @@ | |
try { | |
jetty.start(); | |
} catch (Exception e3) { | |
- log.error("", e3); | |
+ log.error("Could not get the port to start jetty again", e3); | |
// we coud not get the port | |
jetty.stop(); | |
return false; | |
@@ -463,5 +463,27 @@ | |
} | |
return true; | |
} | |
+ | |
+ private static boolean checkPort(int port) { | |
+ boolean running = false; | |
+ Socket socket = null; | |
+ try { | |
+ socket = new Socket((String) null, port); | |
+ System.out.println("RUNNING"); | |
+ running = true; | |
+ } catch (IOException e) { | |
+ // not running | |
+ System.out.println("NOTRUNNING"); | |
+ e.printStackTrace(); | |
+ } finally { | |
+ if (socket != null) { | |
+ try { | |
+ socket.close(); | |
+ } catch (IOException e) {} | |
+ } | |
+ } | |
+ | |
+ return running; | |
+ } | |
} | |
\ No newline at end of file | |
Index: solr/solrj/src/java/org/apache/solr/common/cloud/ZkClientConnectionStrategy.java | |
=================================================================== | |
--- solr/solrj/src/java/org/apache/solr/common/cloud/ZkClientConnectionStrategy.java (revision 1375795) | |
+++ solr/solrj/src/java/org/apache/solr/common/cloud/ZkClientConnectionStrategy.java (working copy) | |
@@ -18,18 +18,65 @@ | |
*/ | |
import java.io.IOException; | |
+import java.util.ArrayList; | |
+import java.util.List; | |
import java.util.concurrent.TimeoutException; | |
+import org.apache.solr.common.SolrException; | |
import org.apache.zookeeper.SolrZooKeeper; | |
import org.apache.zookeeper.Watcher; | |
+import org.slf4j.Logger; | |
+import org.slf4j.LoggerFactory; | |
/** | |
* | |
*/ | |
public abstract class ZkClientConnectionStrategy { | |
+ private static Logger log = LoggerFactory.getLogger(ZkClientConnectionStrategy.class); | |
+ | |
+ private List<DisconnectedListener> disconnectedListeners = new ArrayList<DisconnectedListener>(); | |
+ private List<ConnectedListener> connectedListeners = new ArrayList<ConnectedListener>(); | |
+ | |
public abstract void connect(String zkServerAddress, int zkClientTimeout, Watcher watcher, ZkUpdate updater) throws IOException, InterruptedException, TimeoutException; | |
public abstract void reconnect(String serverAddress, int zkClientTimeout, Watcher watcher, ZkUpdate updater) throws IOException, InterruptedException, TimeoutException; | |
+ public synchronized void disconnected() { | |
+ for (DisconnectedListener listener : disconnectedListeners) { | |
+ try { | |
+ listener.disconnected(); | |
+ } catch (Throwable t) { | |
+ SolrException.log(log, "", t); | |
+ } | |
+ } | |
+ } | |
+ | |
+ public synchronized void connected() { | |
+ for (ConnectedListener listener : connectedListeners) { | |
+ try { | |
+ listener.connected(); | |
+ } catch (Throwable t) { | |
+ SolrException.log(log, "", t); | |
+ } | |
+ } | |
+ } | |
+ | |
+ public interface DisconnectedListener { | |
+ public void disconnected(); | |
+ }; | |
+ | |
+ public interface ConnectedListener { | |
+ public void connected(); | |
+ }; | |
+ | |
+ | |
+ public synchronized void addDisconnectedListener(DisconnectedListener listener) { | |
+ disconnectedListeners.add(listener); | |
+ } | |
+ | |
+ public synchronized void addConnectedListener(ConnectedListener listener) { | |
+ connectedListeners.add(listener); | |
+ } | |
+ | |
public static abstract class ZkUpdate { | |
public abstract void update(SolrZooKeeper zooKeeper) throws InterruptedException, TimeoutException, IOException; | |
} | |
Index: solr/solrj/src/java/org/apache/solr/common/cloud/ConnectionManager.java | |
=================================================================== | |
--- solr/solrj/src/java/org/apache/solr/common/cloud/ConnectionManager.java (revision 1375795) | |
+++ solr/solrj/src/java/org/apache/solr/common/cloud/ConnectionManager.java (working copy) | |
@@ -38,6 +38,8 @@ | |
private boolean connected; | |
private final ZkClientConnectionStrategy connectionStrategy; | |
+ | |
+ private Object connectionUpdateLock = new Object(); | |
private String zkServerAddress; | |
@@ -72,6 +74,7 @@ | |
} | |
if (isClosed) { | |
+ log.info("Client->ZooKeeper status change trigger but we are already closed"); | |
return; | |
} | |
@@ -79,28 +82,25 @@ | |
if (state == KeeperState.SyncConnected) { | |
connected = true; | |
clientConnected.countDown(); | |
+ connectionStrategy.connected(); | |
} else if (state == KeeperState.Expired) { | |
connected = false; | |
- log.info("Attempting to reconnect to recover relationship with ZooKeeper..."); | |
- | |
+ log.info("Our previous ZooKeeper session was expired. Attempting to reconnect to recover relationship with ZooKeeper..."); | |
+ | |
try { | |
connectionStrategy.reconnect(zkServerAddress, zkClientTimeout, this, | |
new ZkClientConnectionStrategy.ZkUpdate() { | |
@Override | |
public void update(SolrZooKeeper keeper) { | |
// if keeper does not replace oldKeeper we must be sure to close it | |
- synchronized (connectionStrategy) { | |
+ synchronized (connectionUpdateLock) { | |
try { | |
waitForConnected(SolrZkClient.DEFAULT_CLIENT_CONNECT_TIMEOUT); | |
- } catch (InterruptedException e1) { | |
- closeKeeper(keeper); | |
- Thread.currentThread().interrupt(); | |
- throw new RuntimeException("Giving up on connecting - we were interrupted", e1); | |
} catch (Exception e1) { | |
closeKeeper(keeper); | |
throw new RuntimeException(e1); | |
} | |
- | |
+ log.info("Connection with ZooKeeper reestablished."); | |
try { | |
client.updateKeeper(keeper); | |
} catch (InterruptedException e) { | |
@@ -129,7 +129,9 @@ | |
} | |
log.info("Connected:" + connected); | |
} else if (state == KeeperState.Disconnected) { | |
+ log.info("zkClient has disconnected"); | |
connected = false; | |
+ connectionStrategy.disconnected(); | |
} else { | |
connected = false; | |
} | |
@@ -151,19 +153,26 @@ | |
} | |
public synchronized void waitForConnected(long waitForConnection) | |
- throws InterruptedException, TimeoutException { | |
+ throws TimeoutException { | |
+ log.info("Waiting for client to connect to ZooKeeper"); | |
long expire = System.currentTimeMillis() + waitForConnection; | |
long left = 1; | |
while (!connected && left > 0) { | |
if (isClosed) { | |
break; | |
} | |
- wait(500); | |
+ try { | |
+ wait(500); | |
+ } catch (InterruptedException e) { | |
+ Thread.currentThread().interrupt(); | |
+ throw new RuntimeException(e); | |
+ } | |
left = expire - System.currentTimeMillis(); | |
} | |
if (!connected) { | |
throw new TimeoutException("Could not connect to ZooKeeper " + zkServerAddress + " within " + waitForConnection + " ms"); | |
} | |
+ log.info("Client is connected to ZooKeeper"); | |
} | |
public synchronized void waitForDisconnected(long timeout) | |
Index: solr/solrj/src/java/org/apache/solr/common/cloud/DefaultConnectionStrategy.java | |
=================================================================== | |
--- solr/solrj/src/java/org/apache/solr/common/cloud/DefaultConnectionStrategy.java (revision 1375795) | |
+++ solr/solrj/src/java/org/apache/solr/common/cloud/DefaultConnectionStrategy.java (working copy) | |
@@ -54,4 +54,5 @@ | |
} | |
+ | |
} | |
Index: solr/solrj/src/java/org/apache/solr/common/cloud/SolrZkClient.java | |
=================================================================== | |
--- solr/solrj/src/java/org/apache/solr/common/cloud/SolrZkClient.java (revision 1375795) | |
+++ solr/solrj/src/java/org/apache/solr/common/cloud/SolrZkClient.java (working copy) | |
@@ -74,6 +74,7 @@ | |
private ZkCmdExecutor zkCmdExecutor = new ZkCmdExecutor(); | |
private volatile boolean isClosed = false; | |
+ private ZkClientConnectionStrategy zkClientConnectionStrategy; | |
/** | |
* @param zkServerAddress | |
@@ -116,6 +117,7 @@ | |
*/ | |
public SolrZkClient(String zkServerAddress, int zkClientTimeout, | |
ZkClientConnectionStrategy strat, final OnReconnect onReconnect, int clientConnectTimeout) { | |
+ this.zkClientConnectionStrategy = strat; | |
connManager = new ConnectionManager("ZooKeeperConnection Watcher:" | |
+ zkServerAddress, this, zkServerAddress, zkClientTimeout, strat, onReconnect); | |
try { | |
@@ -135,29 +137,24 @@ | |
} | |
} | |
}); | |
- } catch (IOException e) { | |
- connManager.close(); | |
- throw new RuntimeException(); | |
- } catch (InterruptedException e) { | |
- connManager.close(); | |
- throw new RuntimeException(); | |
- } catch (TimeoutException e) { | |
+ } catch (Throwable e) { | |
connManager.close(); | |
throw new RuntimeException(); | |
} | |
+ | |
try { | |
connManager.waitForConnected(clientConnectTimeout); | |
- } catch (InterruptedException e) { | |
- Thread.currentThread().interrupt(); | |
- connManager.close(); | |
- throw new RuntimeException(); | |
- } catch (TimeoutException e) { | |
+ } catch (Throwable e) { | |
connManager.close(); | |
throw new RuntimeException(); | |
} | |
numOpens.incrementAndGet(); | |
} | |
+ public ZkClientConnectionStrategy getZkClientConnectionStrategy() { | |
+ return zkClientConnectionStrategy; | |
+ } | |
+ | |
/** | |
* @return true if client is connected | |
*/ | |
Index: solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudSolrServer.java | |
=================================================================== | |
--- solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudSolrServer.java (revision 1375795) | |
+++ solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudSolrServer.java (working copy) | |
@@ -242,14 +242,16 @@ | |
theUrlList.addAll(urlList); | |
} | |
Collections.shuffle(theUrlList, rand); | |
- if (replicas != null) { | |
+ if (sendToLeaders) { | |
ArrayList<String> theReplicas = new ArrayList<String>(replicasList.size()); | |
theReplicas.addAll(replicasList); | |
Collections.shuffle(theReplicas, rand); | |
- | |
+ // System.out.println("leaders:" + theUrlList); | |
+ // System.out.println("replicas:" + theReplicas); | |
theUrlList.addAll(theReplicas); | |
} | |
- //System.out.println("########################## MAKING REQUEST TO " + theUrlList); | |
+ | |
+ // System.out.println("########################## MAKING REQUEST TO " + theUrlList); | |
LBHttpSolrServer.Req req = new LBHttpSolrServer.Req(request, theUrlList); | |
LBHttpSolrServer.Rsp rsp = lbServer.request(req); | |
Index: solr/solrj/src/java/org/apache/solr/client/solrj/request/CoreAdminRequest.java | |
=================================================================== | |
--- solr/solrj/src/java/org/apache/solr/client/solrj/request/CoreAdminRequest.java (revision 1375795) | |
+++ solr/solrj/src/java/org/apache/solr/client/solrj/request/CoreAdminRequest.java (working copy) | |
@@ -121,7 +121,9 @@ | |
protected String state; | |
protected Boolean checkLive; | |
protected Integer pauseFor; | |
+ protected Boolean onlyIfLeader; | |
+ | |
public WaitForState() { | |
action = CoreAdminAction.PREPRECOVERY; | |
} | |
@@ -166,6 +168,14 @@ | |
this.pauseFor = pauseFor; | |
} | |
+ public boolean isOnlyIfLeader() { | |
+ return onlyIfLeader; | |
+ } | |
+ | |
+ public void setOnlyIfLeader(boolean onlyIfLeader) { | |
+ this.onlyIfLeader = onlyIfLeader; | |
+ } | |
+ | |
@Override | |
public SolrParams getParams() { | |
if( action == null ) { | |
@@ -195,6 +205,10 @@ | |
if (pauseFor != null) { | |
params.set( "pauseFor", pauseFor); | |
} | |
+ | |
+ if (onlyIfLeader != null) { | |
+ params.set( "onlyIfLeader", onlyIfLeader); | |
+ } | |
return params; | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment