Created
May 24, 2016 15:50
-
-
Save chatman/cd78a53c6a64adb4ec88cc6673e2eb0d to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package org.apache.solr.cloud; | |
import java.io.IOException; | |
import java.lang.invoke.MethodHandles; | |
import java.util.ArrayList; | |
import java.util.HashMap; | |
import java.util.LinkedHashMap; | |
import java.util.List; | |
import java.util.Map; | |
import java.util.Random; | |
import java.util.concurrent.ConcurrentHashMap; | |
import java.util.concurrent.atomic.AtomicInteger; | |
import java.util.concurrent.atomic.AtomicLong; | |
import org.apache.commons.math3.primes.Primes; | |
import org.apache.lucene.util.LuceneTestCase.Slow; | |
import org.apache.lucene.util.LuceneTestCase.SuppressCodecs; | |
import org.apache.solr.SolrTestCaseJ4.SuppressSSL; | |
import org.apache.solr.client.solrj.SolrClient; | |
import org.apache.solr.client.solrj.SolrServerException; | |
import org.apache.solr.client.solrj.impl.HttpSolrClient; | |
import org.apache.solr.client.solrj.request.UpdateRequest; | |
import org.apache.solr.client.solrj.response.QueryResponse; | |
import org.apache.solr.client.solrj.response.UpdateResponse; | |
import org.apache.solr.common.SolrInputDocument; | |
import org.apache.solr.common.cloud.ClusterState; | |
import org.apache.solr.common.cloud.Replica; | |
import org.apache.solr.common.cloud.Slice; | |
import org.apache.solr.common.cloud.ZkStateReader; | |
import org.apache.solr.common.params.ModifiableSolrParams; | |
import org.apache.solr.common.util.NamedList; | |
import org.apache.zookeeper.KeeperException; | |
import org.junit.BeforeClass; | |
import org.junit.Test; | |
import org.slf4j.Logger; | |
import org.slf4j.LoggerFactory; | |
/* | |
* Licensed to the Apache Software Foundation (ASF) under one or more | |
* contributor license agreements. See the NOTICE file distributed with | |
* this work for additional information regarding copyright ownership. | |
* The ASF licenses this file to You under the Apache License, Version 2.0 | |
* (the "License"); you may not use this file except in compliance with | |
* the License. You may obtain a copy of the License at | |
* | |
* http://www.apache.org/licenses/LICENSE-2.0 | |
* | |
* Unless required by applicable law or agreed to in writing, software | |
* distributed under the License is distributed on an "AS IS" BASIS, | |
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
* See the License for the specific language governing permissions and | |
* limitations under the License. | |
*/ | |
@Slow | |
@SuppressCodecs({"Lucene3x", "Lucene40", "Lucene41", "Lucene42", "Lucene45"/*, "Lucene54", "Memory", "Direct"*/}) | |
@SuppressSSL | |
public class TestStressInPlaceUpdates extends AbstractFullDistribZkTestBase { | |
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); | |
@BeforeClass | |
public static void beforeSuperClass() { | |
System.setProperty("solr.commitwithin.softcommit", "false"); | |
schemaString = "schema16.xml"; // we need a docvalues based _version_ | |
} | |
public TestStressInPlaceUpdates() throws Exception { | |
super(); | |
sliceCount = 1; | |
fixShardCount(1); | |
} | |
protected final ConcurrentHashMap<Integer,DocInfo> model = new ConcurrentHashMap<>(); | |
protected Map<Integer,DocInfo> committedModel = new HashMap<>(); | |
protected long snapshotCount; | |
protected long committedModelClock; | |
protected int clientIndexUsedForCommit; | |
protected volatile int lastId; | |
protected final String field = "val_l"; | |
private void initModel(int ndocs) { | |
for (int i=0; i<ndocs; i++) { | |
model.put(i, new DocInfo(0l, 0, 0)); | |
} | |
committedModel.putAll(model); | |
} | |
SolrClient leaderClient = null; | |
@Test | |
@ShardsFixed(num = 1) | |
public void stressTest() throws Exception { | |
waitForRecoveriesToFinish(true); | |
this.leaderClient = getClientForLeader(); | |
assertNotNull("Couldn't obtain client for the leader of the shard", this.leaderClient); | |
final int commitPercent = 20;// + random().nextInt(20); | |
final int softCommitPercent = 30;//+random().nextInt(75); // what percent of the commits are soft | |
final int deletePercent = 4;//+random().nextInt(25); | |
final int deleteByQueryPercent = random().nextInt(8); | |
final int ndocs = 3; // + (random().nextBoolean() ? random().nextInt(25) : random().nextInt(200)); | |
int nWriteThreads = 5 + random().nextInt(25); | |
int fullUpdatePercent = 5 + random().nextInt(50); | |
final int maxConcurrentCommits = nWriteThreads; // number of committers at a time... it should be <= maxWarmingSearchers | |
// query variables | |
final int percentRealtimeQuery = 100; | |
final AtomicLong operations = new AtomicLong(5000); // number of query operations to perform in total | |
int nReadThreads = 5 + random().nextInt(25); | |
nWriteThreads = 1; | |
nReadThreads = 1; | |
initModel(3); | |
final AtomicInteger numCommitting = new AtomicInteger(); | |
List<Thread> threads = new ArrayList<>(); | |
Thread writerThread = new Thread("WRITER") { | |
Random rand = new Random(random().nextInt()); | |
void commit (boolean soft) { | |
Map<Integer,DocInfo> newCommittedModel; | |
long version; | |
synchronized(TestStressInPlaceUpdates.this) { | |
newCommittedModel = new HashMap<>(model); // take a snapshot | |
version = snapshotCount++; | |
} | |
int chosenClientIndex = rand.nextInt(clients.size()); | |
if (soft) { | |
verbose("softCommit start"); | |
log.info("Doing a soft commit."); | |
try { | |
clients.get(chosenClientIndex).commit(true, true, true); | |
} catch (SolrServerException | IOException e) { | |
throw new RuntimeException(e); | |
} | |
verbose("softCommit end"); | |
} else { | |
verbose("hardCommit start"); | |
log.info("Doing a hard commit."); | |
try { | |
clients.get(chosenClientIndex).commit(); | |
} catch (SolrServerException | IOException e) { | |
throw new RuntimeException(e); | |
} | |
verbose("hardCommit end"); | |
} | |
synchronized(TestStressInPlaceUpdates.this) { | |
// install this model snapshot only if it's newer than the current one | |
if (version >= committedModelClock) { | |
if (VERBOSE) { | |
verbose("installing new committedModel version="+committedModelClock); | |
} | |
clientIndexUsedForCommit = chosenClientIndex; | |
committedModel = newCommittedModel; | |
committedModelClock = version; | |
} | |
} | |
} | |
void index(int id, int nextVal1, long nextVal2, Object ...fields) { | |
try { | |
DocInfo info = model.get(id); | |
long returnedVersion = addDocAndGetVersion(fields); | |
log.info("FULL: Writing id="+id+", val=["+nextVal1+","+nextVal2+"], version="+info.version+", Prev was=["+info.val1+","+info.val2+"]. Returned version="+returnedVersion); | |
// only update model if the version is newer | |
synchronized (model) { | |
DocInfo currInfo = model.get(id); | |
if (returnedVersion > currInfo.version) { | |
model.put(id, new DocInfo(returnedVersion, nextVal1, nextVal2)); | |
} | |
} | |
} catch (Throwable e) { | |
operations.set(-1L); | |
log.error("",e); | |
throw new RuntimeException(e); | |
} | |
} | |
@Override | |
public void run() { | |
index(0, 3, 3000000000l, "id", 0, "val1_i_dvo", 3, "val2_l_dvo", 3000000000l); | |
index(0, 3, 3000000003l, "id", 0, "val2_l_dvo", createMap("inc", 3)); | |
index(0, 3, 3000000006l, "id", 0, "val2_l_dvo", createMap("inc", 3)); | |
index(0, 3, 3000000009l, "id", 0, "val2_l_dvo", createMap("inc", 3)); | |
commit(false); | |
index(0, 3, 3000000012l, "id", 0, "val2_l_dvo", createMap("inc", 3)); | |
index(1, 2, 2000000000, "id", 1, "val1_i_dvo", 2, "val2_l_dvo", 2000000000); | |
index(1, 2, 2000000002, "id", 1, "val2_l_dvo", createMap("inc", 2)); | |
index(1, 3, 3000000000l, "id", 1, "val1_i_dvo", 3, "val2_l_dvo", 3000000000l); | |
index(0, 3, 3000000015l, "id", 0, "val2_l_dvo", createMap("inc", 3)); | |
index(0, 3, 3000000018l, "id", 0, "val2_l_dvo", createMap("inc", 3)); | |
commit(false); | |
} | |
}; | |
threads.add(writerThread); | |
// Read threads | |
for (int i=0; i<nReadThreads; i++) { | |
Thread thread = new Thread("READER"+i) { | |
Random rand = new Random(random().nextInt()); | |
@Override | |
public void run() { | |
try { | |
while (operations.decrementAndGet() >= 0) { | |
// bias toward a recently changed doc | |
int id = rand.nextInt(100) < 25 ? lastId : rand.nextInt(3); | |
// when indexing, we update the index, then the model | |
// so when querying, we should first check the model, and then the index | |
boolean realTime = rand.nextInt(100) < percentRealtimeQuery; | |
DocInfo info; | |
if (realTime) { | |
info = model.get(id); | |
} else { | |
synchronized(TestStressInPlaceUpdates.this) { | |
info = committedModel.get(id); | |
} | |
} | |
if (VERBOSE) { | |
verbose("querying id", id); | |
} | |
ModifiableSolrParams params = new ModifiableSolrParams(); | |
if (realTime) { | |
params.set("wt","json"); | |
params.set("qt","/get"); | |
params.set("ids",Integer.toString(id)); | |
} else { | |
params.set("wt","json"); | |
params.set("q","id:"+Integer.toString(id)); | |
params.set("omitHeader","true"); | |
} | |
int clientId = rand.nextInt(clients.size()); | |
if (!realTime) clientId = clientIndexUsedForCommit; | |
QueryResponse response = clients.get(clientId).query(params); | |
assertTrue("ZRQ, info="+info, info.version==0 || response.getResults().size()==1); | |
if (response.getResults().size()==1) { | |
assertNotNull("Realtime="+realTime+", Response is: "+response+", model: "+info, | |
response.getResults().get(0).get("val2_l_dvo")); | |
Object obj1 = response.getResults().get(0).getFirstValue("val1_i_dvo"); | |
int val1 = obj1==null? 0: | |
((obj1 instanceof ArrayList)? ((ArrayList<Integer>)obj1).get(0): (Integer)obj1); | |
Object obj2 = response.getResults().get(0).getFirstValue("val2_l_dvo"); | |
long val2 = (obj2 instanceof ArrayList)? ((ArrayList<Long>)obj2).get(0): (Long)obj2; | |
Object objVer= response.getResults().get(0).getFirstValue("_version_"); | |
long foundVer = (objVer instanceof ArrayList)? ((ArrayList<Long>)objVer).get(0): (Long)objVer; | |
if (!(val1==0 && val2==0 || val2%val1==0)) { | |
assertTrue("Vals are: "+val1+", "+val2+", id="+id+ ", clientId="+clients.get(clientId)+", Doc retrived is: "+response.toString(), | |
val1==0 && val2==0 || val2%val1==0); | |
} | |
if (foundVer < Math.abs(info.version) | |
|| (foundVer == info.version && (val1 != info.val1 || val2 != info.val2)) ) { // if the version matches, the val must | |
log.error("Realtime="+realTime+", ERROR, id=" + id + " found=" + response + " model=" + info); | |
assertTrue("Realtime="+realTime+", ERROR, id=" + id + " found=" + response + " model=" + info, false); | |
} | |
} else { | |
//fail("Were were results: "+response); | |
} | |
} | |
} catch (Throwable e) { | |
operations.set(-1L); | |
log.error("",e); | |
throw new RuntimeException(e); | |
} | |
} | |
}; | |
threads.add(thread); | |
} | |
// Start all threads | |
for (Thread thread : threads) { | |
thread.start(); | |
} | |
for (Thread thread : threads) { | |
thread.join(); | |
} | |
} | |
class DocInfo { | |
long version; | |
int val1; | |
long val2; | |
public DocInfo(long version, int val1, long val2) { | |
this.version = version; | |
this.val1 = val1; | |
this.val2 = val2; | |
} | |
@Override | |
public String toString() { | |
return "["+version+", "+val1+", "+val2+"]"; | |
} | |
} | |
public static void verbose(Object... args) { | |
// if (!log.isDebugEnabled()) return; | |
StringBuilder sb = new StringBuilder("VERBOSE:"); | |
for (Object o : args) { | |
sb.append(' '); | |
sb.append(o==null ? "(null)" : o.toString()); | |
} | |
log.info(sb.toString()); | |
} | |
protected long addDocAndGetVersion(Object... fields) throws Exception { | |
SolrInputDocument doc = new SolrInputDocument(); | |
addFields(doc, fields); | |
log.info("Writing doc: "+doc); | |
// Pick a client to send the document to (Not used due to SOLR-8733) | |
int which = (doc.getField(id).toString().hashCode() & 0x7fffffff) % clients.size(); | |
ModifiableSolrParams params = new ModifiableSolrParams(); | |
params.add("versions", "true"); | |
UpdateRequest ureq = new UpdateRequest(); | |
ureq.setParams(params); | |
ureq.add(doc); | |
UpdateResponse resp; | |
synchronized (cloudClient) { | |
// send updates to leader, to avoid SOLR-8733 | |
// resp = ureq.process(clients.get(which)); | |
resp = ureq.process(leaderClient); | |
} | |
long returnedVersion = Long.parseLong(((NamedList)resp.getResponse().get("adds")).getVal(0).toString()); | |
assertTrue("Due to SOLR-8733, sometimes returned version is 0. Let us assert that we have successfully" | |
+ " worked around that problem here.", returnedVersion > 0); | |
return returnedVersion; | |
} | |
public SolrClient getClientForLeader() throws KeeperException, InterruptedException { | |
ZkStateReader zkStateReader = cloudClient.getZkStateReader(); | |
cloudClient.getZkStateReader().forceUpdateCollection(DEFAULT_COLLECTION); | |
ClusterState clusterState = cloudClient.getZkStateReader().getClusterState(); | |
Replica leader = null; | |
Slice shard1 = clusterState.getSlice(DEFAULT_COLLECTION, SHARD1); | |
leader = shard1.getLeader(); | |
for (int i=0; i<clients.size(); i++) { | |
String leaderBaseUrl = zkStateReader.getBaseUrlForNodeName(leader.getNodeName()); | |
if (((HttpSolrClient)clients.get(i)).getBaseURL().startsWith(leaderBaseUrl)) | |
return clients.get(i); | |
} | |
return null; | |
} | |
/** | |
* Strings at even index are keys, odd-index strings are values in the | |
* returned map | |
*/ | |
@SuppressWarnings({"unchecked", "rawtypes"}) | |
private Map createMap(Object... args) { | |
Map result = new LinkedHashMap(); | |
if (args == null || args.length == 0) | |
return result; | |
for (int i = 0; i < args.length - 1; i += 2) | |
result.put(args[i], args[i + 1]); | |
return result; | |
} | |
@Override | |
public void tearDown() throws Exception { | |
super.tearDown(); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment