Skip to content

Instantly share code, notes, and snippets.

@chatman
Created May 24, 2016 15:50
Show Gist options
  • Save chatman/cd78a53c6a64adb4ec88cc6673e2eb0d to your computer and use it in GitHub Desktop.
Save chatman/cd78a53c6a64adb4ec88cc6673e2eb0d to your computer and use it in GitHub Desktop.
package org.apache.solr.cloud;
import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.math3.primes.Primes;
import org.apache.lucene.util.LuceneTestCase.Slow;
import org.apache.lucene.util.LuceneTestCase.SuppressCodecs;
import org.apache.solr.SolrTestCaseJ4.SuppressSSL;
import org.apache.solr.client.solrj.SolrClient;
import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.impl.HttpSolrClient;
import org.apache.solr.client.solrj.request.UpdateRequest;
import org.apache.solr.client.solrj.response.QueryResponse;
import org.apache.solr.client.solrj.response.UpdateResponse;
import org.apache.solr.common.SolrInputDocument;
import org.apache.solr.common.cloud.ClusterState;
import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.Slice;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.util.NamedList;
import org.apache.zookeeper.KeeperException;
import org.junit.BeforeClass;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
@Slow
@SuppressCodecs({"Lucene3x", "Lucene40", "Lucene41", "Lucene42", "Lucene45"/*, "Lucene54", "Memory", "Direct"*/})
@SuppressSSL
public class TestStressInPlaceUpdates extends AbstractFullDistribZkTestBase {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
@BeforeClass
public static void beforeSuperClass() {
System.setProperty("solr.commitwithin.softcommit", "false");
schemaString = "schema16.xml"; // we need a docvalues based _version_
}
public TestStressInPlaceUpdates() throws Exception {
super();
sliceCount = 1;
fixShardCount(1);
}
protected final ConcurrentHashMap<Integer,DocInfo> model = new ConcurrentHashMap<>();
protected Map<Integer,DocInfo> committedModel = new HashMap<>();
protected long snapshotCount;
protected long committedModelClock;
protected int clientIndexUsedForCommit;
protected volatile int lastId;
protected final String field = "val_l";
private void initModel(int ndocs) {
for (int i=0; i<ndocs; i++) {
model.put(i, new DocInfo(0l, 0, 0));
}
committedModel.putAll(model);
}
SolrClient leaderClient = null;
@Test
@ShardsFixed(num = 1)
public void stressTest() throws Exception {
waitForRecoveriesToFinish(true);
this.leaderClient = getClientForLeader();
assertNotNull("Couldn't obtain client for the leader of the shard", this.leaderClient);
final int commitPercent = 20;// + random().nextInt(20);
final int softCommitPercent = 30;//+random().nextInt(75); // what percent of the commits are soft
final int deletePercent = 4;//+random().nextInt(25);
final int deleteByQueryPercent = random().nextInt(8);
final int ndocs = 3; // + (random().nextBoolean() ? random().nextInt(25) : random().nextInt(200));
int nWriteThreads = 5 + random().nextInt(25);
int fullUpdatePercent = 5 + random().nextInt(50);
final int maxConcurrentCommits = nWriteThreads; // number of committers at a time... it should be <= maxWarmingSearchers
// query variables
final int percentRealtimeQuery = 100;
final AtomicLong operations = new AtomicLong(5000); // number of query operations to perform in total
int nReadThreads = 5 + random().nextInt(25);
nWriteThreads = 1;
nReadThreads = 1;
initModel(3);
final AtomicInteger numCommitting = new AtomicInteger();
List<Thread> threads = new ArrayList<>();
Thread writerThread = new Thread("WRITER") {
Random rand = new Random(random().nextInt());
void commit (boolean soft) {
Map<Integer,DocInfo> newCommittedModel;
long version;
synchronized(TestStressInPlaceUpdates.this) {
newCommittedModel = new HashMap<>(model); // take a snapshot
version = snapshotCount++;
}
int chosenClientIndex = rand.nextInt(clients.size());
if (soft) {
verbose("softCommit start");
log.info("Doing a soft commit.");
try {
clients.get(chosenClientIndex).commit(true, true, true);
} catch (SolrServerException | IOException e) {
throw new RuntimeException(e);
}
verbose("softCommit end");
} else {
verbose("hardCommit start");
log.info("Doing a hard commit.");
try {
clients.get(chosenClientIndex).commit();
} catch (SolrServerException | IOException e) {
throw new RuntimeException(e);
}
verbose("hardCommit end");
}
synchronized(TestStressInPlaceUpdates.this) {
// install this model snapshot only if it's newer than the current one
if (version >= committedModelClock) {
if (VERBOSE) {
verbose("installing new committedModel version="+committedModelClock);
}
clientIndexUsedForCommit = chosenClientIndex;
committedModel = newCommittedModel;
committedModelClock = version;
}
}
}
void index(int id, int nextVal1, long nextVal2, Object ...fields) {
try {
DocInfo info = model.get(id);
long returnedVersion = addDocAndGetVersion(fields);
log.info("FULL: Writing id="+id+", val=["+nextVal1+","+nextVal2+"], version="+info.version+", Prev was=["+info.val1+","+info.val2+"]. Returned version="+returnedVersion);
// only update model if the version is newer
synchronized (model) {
DocInfo currInfo = model.get(id);
if (returnedVersion > currInfo.version) {
model.put(id, new DocInfo(returnedVersion, nextVal1, nextVal2));
}
}
} catch (Throwable e) {
operations.set(-1L);
log.error("",e);
throw new RuntimeException(e);
}
}
@Override
public void run() {
index(0, 3, 3000000000l, "id", 0, "val1_i_dvo", 3, "val2_l_dvo", 3000000000l);
index(0, 3, 3000000003l, "id", 0, "val2_l_dvo", createMap("inc", 3));
index(0, 3, 3000000006l, "id", 0, "val2_l_dvo", createMap("inc", 3));
index(0, 3, 3000000009l, "id", 0, "val2_l_dvo", createMap("inc", 3));
commit(false);
index(0, 3, 3000000012l, "id", 0, "val2_l_dvo", createMap("inc", 3));
index(1, 2, 2000000000, "id", 1, "val1_i_dvo", 2, "val2_l_dvo", 2000000000);
index(1, 2, 2000000002, "id", 1, "val2_l_dvo", createMap("inc", 2));
index(1, 3, 3000000000l, "id", 1, "val1_i_dvo", 3, "val2_l_dvo", 3000000000l);
index(0, 3, 3000000015l, "id", 0, "val2_l_dvo", createMap("inc", 3));
index(0, 3, 3000000018l, "id", 0, "val2_l_dvo", createMap("inc", 3));
commit(false);
}
};
threads.add(writerThread);
// Read threads
for (int i=0; i<nReadThreads; i++) {
Thread thread = new Thread("READER"+i) {
Random rand = new Random(random().nextInt());
@Override
public void run() {
try {
while (operations.decrementAndGet() >= 0) {
// bias toward a recently changed doc
int id = rand.nextInt(100) < 25 ? lastId : rand.nextInt(3);
// when indexing, we update the index, then the model
// so when querying, we should first check the model, and then the index
boolean realTime = rand.nextInt(100) < percentRealtimeQuery;
DocInfo info;
if (realTime) {
info = model.get(id);
} else {
synchronized(TestStressInPlaceUpdates.this) {
info = committedModel.get(id);
}
}
if (VERBOSE) {
verbose("querying id", id);
}
ModifiableSolrParams params = new ModifiableSolrParams();
if (realTime) {
params.set("wt","json");
params.set("qt","/get");
params.set("ids",Integer.toString(id));
} else {
params.set("wt","json");
params.set("q","id:"+Integer.toString(id));
params.set("omitHeader","true");
}
int clientId = rand.nextInt(clients.size());
if (!realTime) clientId = clientIndexUsedForCommit;
QueryResponse response = clients.get(clientId).query(params);
assertTrue("ZRQ, info="+info, info.version==0 || response.getResults().size()==1);
if (response.getResults().size()==1) {
assertNotNull("Realtime="+realTime+", Response is: "+response+", model: "+info,
response.getResults().get(0).get("val2_l_dvo"));
Object obj1 = response.getResults().get(0).getFirstValue("val1_i_dvo");
int val1 = obj1==null? 0:
((obj1 instanceof ArrayList)? ((ArrayList<Integer>)obj1).get(0): (Integer)obj1);
Object obj2 = response.getResults().get(0).getFirstValue("val2_l_dvo");
long val2 = (obj2 instanceof ArrayList)? ((ArrayList<Long>)obj2).get(0): (Long)obj2;
Object objVer= response.getResults().get(0).getFirstValue("_version_");
long foundVer = (objVer instanceof ArrayList)? ((ArrayList<Long>)objVer).get(0): (Long)objVer;
if (!(val1==0 && val2==0 || val2%val1==0)) {
assertTrue("Vals are: "+val1+", "+val2+", id="+id+ ", clientId="+clients.get(clientId)+", Doc retrived is: "+response.toString(),
val1==0 && val2==0 || val2%val1==0);
}
if (foundVer < Math.abs(info.version)
|| (foundVer == info.version && (val1 != info.val1 || val2 != info.val2)) ) { // if the version matches, the val must
log.error("Realtime="+realTime+", ERROR, id=" + id + " found=" + response + " model=" + info);
assertTrue("Realtime="+realTime+", ERROR, id=" + id + " found=" + response + " model=" + info, false);
}
} else {
//fail("Were were results: "+response);
}
}
} catch (Throwable e) {
operations.set(-1L);
log.error("",e);
throw new RuntimeException(e);
}
}
};
threads.add(thread);
}
// Start all threads
for (Thread thread : threads) {
thread.start();
}
for (Thread thread : threads) {
thread.join();
}
}
class DocInfo {
long version;
int val1;
long val2;
public DocInfo(long version, int val1, long val2) {
this.version = version;
this.val1 = val1;
this.val2 = val2;
}
@Override
public String toString() {
return "["+version+", "+val1+", "+val2+"]";
}
}
public static void verbose(Object... args) {
// if (!log.isDebugEnabled()) return;
StringBuilder sb = new StringBuilder("VERBOSE:");
for (Object o : args) {
sb.append(' ');
sb.append(o==null ? "(null)" : o.toString());
}
log.info(sb.toString());
}
protected long addDocAndGetVersion(Object... fields) throws Exception {
SolrInputDocument doc = new SolrInputDocument();
addFields(doc, fields);
log.info("Writing doc: "+doc);
// Pick a client to send the document to (Not used due to SOLR-8733)
int which = (doc.getField(id).toString().hashCode() & 0x7fffffff) % clients.size();
ModifiableSolrParams params = new ModifiableSolrParams();
params.add("versions", "true");
UpdateRequest ureq = new UpdateRequest();
ureq.setParams(params);
ureq.add(doc);
UpdateResponse resp;
synchronized (cloudClient) {
// send updates to leader, to avoid SOLR-8733
// resp = ureq.process(clients.get(which));
resp = ureq.process(leaderClient);
}
long returnedVersion = Long.parseLong(((NamedList)resp.getResponse().get("adds")).getVal(0).toString());
assertTrue("Due to SOLR-8733, sometimes returned version is 0. Let us assert that we have successfully"
+ " worked around that problem here.", returnedVersion > 0);
return returnedVersion;
}
public SolrClient getClientForLeader() throws KeeperException, InterruptedException {
ZkStateReader zkStateReader = cloudClient.getZkStateReader();
cloudClient.getZkStateReader().forceUpdateCollection(DEFAULT_COLLECTION);
ClusterState clusterState = cloudClient.getZkStateReader().getClusterState();
Replica leader = null;
Slice shard1 = clusterState.getSlice(DEFAULT_COLLECTION, SHARD1);
leader = shard1.getLeader();
for (int i=0; i<clients.size(); i++) {
String leaderBaseUrl = zkStateReader.getBaseUrlForNodeName(leader.getNodeName());
if (((HttpSolrClient)clients.get(i)).getBaseURL().startsWith(leaderBaseUrl))
return clients.get(i);
}
return null;
}
/**
* Strings at even index are keys, odd-index strings are values in the
* returned map
*/
@SuppressWarnings({"unchecked", "rawtypes"})
private Map createMap(Object... args) {
Map result = new LinkedHashMap();
if (args == null || args.length == 0)
return result;
for (int i = 0; i < args.length - 1; i += 2)
result.put(args[i], args[i + 1]);
return result;
}
@Override
public void tearDown() throws Exception {
super.tearDown();
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment