Skip to content

Instantly share code, notes, and snippets.

@andsel
Created March 30, 2020 09:52
Show Gist options
  • Save andsel/d83bb3359d585ee8f5d4cef0e164dbae to your computer and use it in GitHub Desktop.
Save andsel/d83bb3359d585ee8f5d4cef0e164dbae to your computer and use it in GitHub Desktop.
queue throughput perf check
import org.eclipse.jetty.toolchain.perf.PlatformTimer;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import java.io.IOException;
import java.util.concurrent.*;
public class QueuePerfTest {
int messagesPerSecond = 100_000;
// long numToSend = 10_000_000;
long numToSend = 10_000_000;
private static final int BATCH_SIZE = 1000;
@Rule
public TemporaryFolder temporaryFolder = new TemporaryFolder();
private ExecutorService executor;
private String dataPath;
@Before
public void setUp() throws Exception {
dataPath = temporaryFolder.newFolder("data").getPath();
executor = Executors.newSingleThreadExecutor();
}
@After
public void tearDown() throws Exception {
executor.shutdownNow();
if (!executor.awaitTermination(2L, TimeUnit.MINUTES)) {
throw new IllegalStateException("Failed to shut down Executor");
}
}
@Test
public void perfFloodTest() throws Exception {
System.out.println("Data path: " + dataPath);
Queue q = new Queue(/*TestSettings.persistedQueueSettings(256 * 1024 * 1024, dataPath)*/settings(256 * 1024 * 1024, dataPath));
q.open();
warmup(q, 3);
long pauseMicroseconds = (1_000 * 1_000) / this.messagesPerSecond;
long startNanos = System.nanoTime();
//initialize the timer
Queueable element = new StringElement("foobarbaz");
PlatformTimer timer = PlatformTimer.detect();
System.out.println("Pause: " + pauseMicroseconds + "micro secs");
Future<?> readerTask = startupConsumer(q, (int) numToSend);
for (int i = 0; i < numToSend; i++) {
q.write(element);
timer.sleep(pauseMicroseconds);
}
long writeCompletedNanos = System.nanoTime() - startNanos;
readerTask.get(1, TimeUnit.MINUTES);
long elapsedNanos = System.nanoTime() - startNanos;
long elapsedMillis = elapsedNanos / (1_000 * 1_000);
System.out.println("Write completed in (ms): " + (writeCompletedNanos / (1_000 * 1_000)));
System.out.println("Elapsed time (ms): " + elapsedMillis);
System.out.printf("Speed msg/sec: %.2f \n", ((double) numToSend / elapsedMillis) * 1_000);
}
private void warmup(Queue q, long warmup_iterations) throws Exception {
for (int i = 0; i < warmup_iterations; i++) {
System.out.println("Warmup iteration " + (i + 1));
warmupIteration(q, 10_000);
}
System.out.println("Warmup completed");
}
private void warmupIteration(Queue q, long warmup_loops) throws Exception {
Future<?> readerTask = startupConsumer(q, warmup_loops);
Queueable msg = new StringElement("foobarbaz");
for (int i = 0; i < warmup_loops; i++) {
q.write(msg);
}
readerTask.get(1, TimeUnit.MINUTES);
}
private Future<?> startupConsumer(Queue q, long warmup_loops) {
return executor.submit(() -> {
try {
for (long i = 0; i < warmup_loops / BATCH_SIZE; ++i) {
try (Batch batch = q.readBatch(BATCH_SIZE, TimeUnit.SECONDS.toMillis(1))) {
for (final Queueable elem : batch.getElements()) {
elem.hashCode();
}
}
}
} catch (IOException e) {
e.printStackTrace();
}
});
}
private static Settings settings(int queueSize, String folder) {
return SettingsImpl.fileSettingsBuilder(/*Files.createTempDir().getPath()*/folder)
.capacity(queueSize)
.queueMaxBytes(4 * queueSize)
.checkpointMaxWrites(1024)
.checkpointMaxAcks(1024)
.elementClass(StringElement.class).build();
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment