Skip to content

Instantly share code, notes, and snippets.

@daviscook477
Created November 13, 2023 17:24
Show Gist options
  • Save daviscook477/a1b1fbebe75b0fdc41c889e542a2ed42 to your computer and use it in GitHub Desktop.
Save daviscook477/a1b1fbebe75b0fdc41c889e542a2ed42 to your computer and use it in GitHub Desktop.
Test for demonstrating large p99s when using search, commit, and purgeCache concurrently
package org.apache.lucene.monitor;
import java.io.IOException;
import java.nio.file.Path;
import java.util.*;
import java.util.concurrent.TimeUnit;
import org.apache.lucene.document.Document;
import org.apache.lucene.document.Field;
import org.apache.lucene.search.MatchAllDocsQuery;
public class TestMonitorConcurrentPurges extends MonitorTestBase {
private Path indexDirectory = createTempDir();
private static String makeId(int idx) {
return "TEST_" + idx;
}
private static String makeToken(int idx) {
return "TOKEN_" + idx;
}
private static String makeQuery(int idx) {
return "+" + FIELD + ":TOKEN_" + idx;
}
private void populateMonitor(Monitor underTest, int numQueries) throws IOException {
List<MonitorQuery> monitorQueries = new ArrayList<>();
for (int i = 0; i < numQueries; i++) {
monitorQueries.add(mq(makeId(i), makeQuery(i)));
}
underTest.register(monitorQueries);
}
private static class Updater implements Runnable {
private final Random random;
private final Monitor underTest;
private final int numQueries;
private volatile boolean isRunning = true;
public Updater(Random random, Monitor underTest, int numQueries) {
this.random = random;
this.underTest = underTest;
this.numQueries = numQueries;
}
private void sendSingleUpdateToMonitor() throws IOException {
int idx = random.nextInt(numQueries);
long start = System.nanoTime();
underTest.register(mq(makeId(idx), makeQuery(idx)));
long end = System.nanoTime();
}
public void stop() {
isRunning = false;
}
@Override
public void run() {
while (isRunning) {
try {
sendSingleUpdateToMonitor();
} catch (IOException e) {
fail("IOException occurred while attempting to send updates to monitor: " + e.getMessage() + " " + Arrays.toString(e.getStackTrace()));
}
}
}
}
private static class Queryer implements Runnable {
private final Random random;
private final Monitor underTest;
private final int numQueries;
private final int numTries;
private long[] durations;
public Queryer(Random random, Monitor underTest, int numQueries, int numTries) {
this.random = random;
this.underTest = underTest;
this.numQueries = numQueries;
this.numTries = numTries;
this.durations = new long[numTries];
}
private long sendSingleQueryToMonitor() throws IOException {
int idx = random.nextInt(numQueries);
long start = System.nanoTime();
String text = "This is a test document that has a few tokens but not that many tokens and also has an important " + makeToken(idx) + " for matching";
Document doc = new Document();
doc.add(newTextField(FIELD, text, Field.Store.NO));
int matchCount = underTest.match(doc, QueryMatch.SIMPLE_MATCHER).getMatchCount();
long end = System.nanoTime();
long duration = end - start;
assertEquals(1, matchCount);
return duration;
}
public long[] getDurations() {
return durations;
}
@Override
public void run() {
for (int i = 0; i < numTries; i++) {
try {
long duration = sendSingleQueryToMonitor();
durations[i] = duration;
} catch (IOException e) {
fail("IOException occurred while attempting to send updates to monitor: " + e.getMessage() + " " + Arrays.toString(e.getStackTrace()));
}
}
}
}
private static double mean(long[] arr) {
double sum = 0;
for (int i = 0; i < arr.length; i++) {
sum += arr[i];
}
return sum / arr.length;
}
/**
* Verifies that the performance of match requests is not negatively impacted by
* concurrent background purges and updates to the index.
*/
public void testConcurrentBackgroundPurgesAndUpdatesAndMatchesTest() throws IOException, InterruptedException {
MonitorConfiguration config =
new MonitorConfiguration()
.setIndexPath(
indexDirectory, MonitorQuerySerializer.fromParser(MonitorTestBase::parse));
Queryer queryer1;
Queryer queryer2;
// disable purging for the duration of the test of queryer1
config.setPurgeFrequency(1000, TimeUnit.SECONDS);
try (Monitor monitor = new Monitor(ANALYZER, config)) {
// setup the monitor with a large index - this makes each individual update take longer to complete
int largeAmount = 20000;
populateMonitor(monitor, largeAmount);
Random rng = new Random(1234L);
/*
* create and start an updater thread that will repeatedly send updates to the monitor
* in order to ensure that the monitor is always processing an update concurrently with any
* other operations we take on the monitor
*/
Updater updater = new Updater(rng, monitor, largeAmount);
Thread updaterThread = new Thread(updater);
updaterThread.start();
int tries = 1000;
queryer1 = new Queryer(rng, monitor, largeAmount, tries);
Thread queryerThread1 = new Thread(queryer1);
queryerThread1.start();
// shut down the test scenario nicely
queryerThread1.join();
updater.stop();
updaterThread.join();
}
// always have a purge running concurrently with matching for the duration of the test of queryer2
// this cannot result in multiple purges at the same time because the background purge thread is run using
// ScheduledExecutorService.scheduleAtFixedRate which has the following behavior:
// "If any execution of this task takes longer than its period, then subsequent executions may start late, but will not concurrently execute."
config.setPurgeFrequency(1, TimeUnit.MILLISECONDS);
try (Monitor monitor = new Monitor(ANALYZER, config)) {
// setup the monitor with a large index - this makes each individual update take longer to complete
int largeAmount = 20000;
populateMonitor(monitor, largeAmount);
Random rng = new Random(1234L);
/*
* create and start an updater thread that will repeatedly send updates to the monitor
* in order to ensure that the monitor is always processing an update concurrently with any
* other operations we take on the monitor
*/
Updater updater = new Updater(rng, monitor, largeAmount);
Thread updaterThread = new Thread(updater);
updaterThread.start();
int tries = 1000;
queryer2 = new Queryer(rng, monitor, largeAmount, tries);
Thread queryerThread2 = new Thread(queryer2);
queryerThread2.start();
// shut down the test scenario nicely
queryerThread2.join();
updater.stop();
updaterThread.join();
}
long[] durations1 = queryer1.getDurations();
Arrays.sort(durations1);
long[] durations1percentile99 = new long[durations1.length/100];
int start99 = durations1.length/100*99;
for (int i = start99; i < durations1.length; i++) {
durations1percentile99[i-start99] = durations1[i];
}
long[] durations2 = queryer2.getDurations();
Arrays.sort(durations2);
long[] durations2percentile99 = new long[durations2.length/100];
start99 = durations2.length/100*99;
for (int i = start99; i < durations2.length; i++) {
durations2percentile99[i-start99] = durations2[i];
}
// compute the means of the values in p99 to p100 duration for matches
double x1Bar = mean(durations1percentile99);
double x2Bar = mean(durations2percentile99);
// the expectation is that these should be nearly the same but in practice, without the proposed fix,
// the second one is at least an order of magnitude larger - when running this test with the proposed fix
// they do end up nearly the same
System.out.println("Commit + Search w/o Purge search p99 xbar = " + x1Bar);
System.out.println("Commit + Search + Purge search p99 xbar = " + x2Bar);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment