Created
November 13, 2023 17:24
-
-
Save daviscook477/a1b1fbebe75b0fdc41c889e542a2ed42 to your computer and use it in GitHub Desktop.
Test for demonstrating large p99s when using search, commit, and purgeCache concurrently
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package org.apache.lucene.monitor; | |
import java.io.IOException; | |
import java.nio.file.Path; | |
import java.util.*; | |
import java.util.concurrent.TimeUnit; | |
import org.apache.lucene.document.Document; | |
import org.apache.lucene.document.Field; | |
import org.apache.lucene.search.MatchAllDocsQuery; | |
public class TestMonitorConcurrentPurges extends MonitorTestBase { | |
private Path indexDirectory = createTempDir(); | |
private static String makeId(int idx) { | |
return "TEST_" + idx; | |
} | |
private static String makeToken(int idx) { | |
return "TOKEN_" + idx; | |
} | |
private static String makeQuery(int idx) { | |
return "+" + FIELD + ":TOKEN_" + idx; | |
} | |
private void populateMonitor(Monitor underTest, int numQueries) throws IOException { | |
List<MonitorQuery> monitorQueries = new ArrayList<>(); | |
for (int i = 0; i < numQueries; i++) { | |
monitorQueries.add(mq(makeId(i), makeQuery(i))); | |
} | |
underTest.register(monitorQueries); | |
} | |
private static class Updater implements Runnable { | |
private final Random random; | |
private final Monitor underTest; | |
private final int numQueries; | |
private volatile boolean isRunning = true; | |
public Updater(Random random, Monitor underTest, int numQueries) { | |
this.random = random; | |
this.underTest = underTest; | |
this.numQueries = numQueries; | |
} | |
private void sendSingleUpdateToMonitor() throws IOException { | |
int idx = random.nextInt(numQueries); | |
long start = System.nanoTime(); | |
underTest.register(mq(makeId(idx), makeQuery(idx))); | |
long end = System.nanoTime(); | |
} | |
public void stop() { | |
isRunning = false; | |
} | |
@Override | |
public void run() { | |
while (isRunning) { | |
try { | |
sendSingleUpdateToMonitor(); | |
} catch (IOException e) { | |
fail("IOException occurred while attempting to send updates to monitor: " + e.getMessage() + " " + Arrays.toString(e.getStackTrace())); | |
} | |
} | |
} | |
} | |
private static class Queryer implements Runnable { | |
private final Random random; | |
private final Monitor underTest; | |
private final int numQueries; | |
private final int numTries; | |
private long[] durations; | |
public Queryer(Random random, Monitor underTest, int numQueries, int numTries) { | |
this.random = random; | |
this.underTest = underTest; | |
this.numQueries = numQueries; | |
this.numTries = numTries; | |
this.durations = new long[numTries]; | |
} | |
private long sendSingleQueryToMonitor() throws IOException { | |
int idx = random.nextInt(numQueries); | |
long start = System.nanoTime(); | |
String text = "This is a test document that has a few tokens but not that many tokens and also has an important " + makeToken(idx) + " for matching"; | |
Document doc = new Document(); | |
doc.add(newTextField(FIELD, text, Field.Store.NO)); | |
int matchCount = underTest.match(doc, QueryMatch.SIMPLE_MATCHER).getMatchCount(); | |
long end = System.nanoTime(); | |
long duration = end - start; | |
assertEquals(1, matchCount); | |
return duration; | |
} | |
public long[] getDurations() { | |
return durations; | |
} | |
@Override | |
public void run() { | |
for (int i = 0; i < numTries; i++) { | |
try { | |
long duration = sendSingleQueryToMonitor(); | |
durations[i] = duration; | |
} catch (IOException e) { | |
fail("IOException occurred while attempting to send updates to monitor: " + e.getMessage() + " " + Arrays.toString(e.getStackTrace())); | |
} | |
} | |
} | |
} | |
private static double mean(long[] arr) { | |
double sum = 0; | |
for (int i = 0; i < arr.length; i++) { | |
sum += arr[i]; | |
} | |
return sum / arr.length; | |
} | |
/** | |
* Verifies that the performance of match requests is not negatively impacted by | |
* concurrent background purges and updates to the index. | |
*/ | |
public void testConcurrentBackgroundPurgesAndUpdatesAndMatchesTest() throws IOException, InterruptedException { | |
MonitorConfiguration config = | |
new MonitorConfiguration() | |
.setIndexPath( | |
indexDirectory, MonitorQuerySerializer.fromParser(MonitorTestBase::parse)); | |
Queryer queryer1; | |
Queryer queryer2; | |
// disable purging for the duration of the test of queryer1 | |
config.setPurgeFrequency(1000, TimeUnit.SECONDS); | |
try (Monitor monitor = new Monitor(ANALYZER, config)) { | |
// setup the monitor with a large index - this makes each individual update take longer to complete | |
int largeAmount = 20000; | |
populateMonitor(monitor, largeAmount); | |
Random rng = new Random(1234L); | |
/* | |
* create and start an updater thread that will repeatedly send updates to the monitor | |
* in order to ensure that the monitor is always processing an update concurrently with any | |
* other operations we take on the monitor | |
*/ | |
Updater updater = new Updater(rng, monitor, largeAmount); | |
Thread updaterThread = new Thread(updater); | |
updaterThread.start(); | |
int tries = 1000; | |
queryer1 = new Queryer(rng, monitor, largeAmount, tries); | |
Thread queryerThread1 = new Thread(queryer1); | |
queryerThread1.start(); | |
// shut down the test scenario nicely | |
queryerThread1.join(); | |
updater.stop(); | |
updaterThread.join(); | |
} | |
// always have a purge running concurrently with matching for the duration of the test of queryer2 | |
// this cannot result in multiple purges at the same time because the background purge thread is run using | |
// ScheduledExecutorService.scheduleAtFixedRate which has the following behavior: | |
// "If any execution of this task takes longer than its period, then subsequent executions may start late, but will not concurrently execute." | |
config.setPurgeFrequency(1, TimeUnit.MILLISECONDS); | |
try (Monitor monitor = new Monitor(ANALYZER, config)) { | |
// setup the monitor with a large index - this makes each individual update take longer to complete | |
int largeAmount = 20000; | |
populateMonitor(monitor, largeAmount); | |
Random rng = new Random(1234L); | |
/* | |
* create and start an updater thread that will repeatedly send updates to the monitor | |
* in order to ensure that the monitor is always processing an update concurrently with any | |
* other operations we take on the monitor | |
*/ | |
Updater updater = new Updater(rng, monitor, largeAmount); | |
Thread updaterThread = new Thread(updater); | |
updaterThread.start(); | |
int tries = 1000; | |
queryer2 = new Queryer(rng, monitor, largeAmount, tries); | |
Thread queryerThread2 = new Thread(queryer2); | |
queryerThread2.start(); | |
// shut down the test scenario nicely | |
queryerThread2.join(); | |
updater.stop(); | |
updaterThread.join(); | |
} | |
long[] durations1 = queryer1.getDurations(); | |
Arrays.sort(durations1); | |
long[] durations1percentile99 = new long[durations1.length/100]; | |
int start99 = durations1.length/100*99; | |
for (int i = start99; i < durations1.length; i++) { | |
durations1percentile99[i-start99] = durations1[i]; | |
} | |
long[] durations2 = queryer2.getDurations(); | |
Arrays.sort(durations2); | |
long[] durations2percentile99 = new long[durations2.length/100]; | |
start99 = durations2.length/100*99; | |
for (int i = start99; i < durations2.length; i++) { | |
durations2percentile99[i-start99] = durations2[i]; | |
} | |
// compute the means of the values in p99 to p100 duration for matches | |
double x1Bar = mean(durations1percentile99); | |
double x2Bar = mean(durations2percentile99); | |
// the expectation is that these should be nearly the same but in practice, without the proposed fix, | |
// the second one is at least an order of magnitude larger - when running this test with the proposed fix | |
// they do end up nearly the same | |
System.out.println("Commit + Search w/o Purge search p99 xbar = " + x1Bar); | |
System.out.println("Commit + Search + Purge search p99 xbar = " + x2Bar); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment