Skip to content

Instantly share code, notes, and snippets.

@ebarlas
Created November 3, 2022 23:05
Show Gist options
  • Save ebarlas/70e38ca01abb68210d5158206401644c to your computer and use it in GitHub Desktop.
Save ebarlas/70e38ca01abb68210d5158206401644c to your computer and use it in GitHub Desktop.
package loomtest;
import jdk.incubator.concurrent.StructuredTaskScope;
import java.io.IOException;
import java.net.URI;
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.time.Duration;
import java.time.Instant;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.IntStream;
public class FixedThroughput {
private final Semaphore semaphore; // limits concurrent tasks/requests/connections
private final AtomicLong taskCounter; // number of tasks queued
private final AtomicLong requestCounter; // number of requests completed
private final HttpClient httpClient;
private final HttpRequest request;
private final double requestsPerNano;
private final long startTime;
private final long endTime;
FixedThroughput(String uri, int maxConcurrency, Duration testDuration, int requestsPerSecond) {
semaphore = new Semaphore(maxConcurrency);
taskCounter = new AtomicLong();
requestCounter = new AtomicLong();
httpClient = HttpClient.newBuilder().executor(Executors.newVirtualThreadPerTaskExecutor()).build();
request = HttpRequest.newBuilder(URI.create(uri)).GET().build();
requestsPerNano = (double) requestsPerSecond / TimeUnit.SECONDS.toNanos(1);
startTime = System.nanoTime();
endTime = startTime + testDuration.toNanos();
}
void run() throws InterruptedException, TimeoutException, ExecutionException {
try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
scope.fork(() -> scheduleRequests(scope));
scope.joinUntil(Instant.now().plusSeconds(60));
scope.throwIfFailed();
}
var d = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTime);
var tput = 1.0 * requestCounter.get() / d * 1000;
System.out.println("Test took: " + d + "ms count: " + taskCounter.get() + " responses: " + requestCounter.get() + " observed throughput: " + tput + " req/s");
}
Void scheduleRequests(StructuredTaskScope.ShutdownOnFailure scope) {
while (System.nanoTime() < endTime) {
var elapsed = System.nanoTime() - startTime;
var target = requestsPerNano * elapsed;
var deficit = target - taskCounter.get();
if (deficit > 1) {
var newTasks = (int) Math.floor(deficit);
IntStream.range(0, newTasks).forEach(n -> scope.fork(this::sendRequest));
taskCounter.addAndGet(newTasks);
}
try {
Thread.sleep(1);
} catch (InterruptedException e) {
break;
}
}
return null;
}
Void sendRequest() throws InterruptedException, IOException {
semaphore.acquire(); // wait my turn by acquiring permit
try {
httpClient.send(request, HttpResponse.BodyHandlers.discarding());
} finally {
semaphore.release(); // release permit, allow next in line
requestCounter.incrementAndGet();
}
return null;
}
public static void main(String[] args) throws Exception {
var uri = "http://localhost:8080/test";
var duration = 5;
var rate = 10_000;
var tests = 2;
var maxConcurrency = 1_000;
System.out.println("uri: " + uri + " duration: " + duration + " s" + " asked throughput: " + rate + " req/s" + " #tests: " + tests);
for (int i = 0; i < tests; ++i) {
new FixedThroughput(uri, maxConcurrency, Duration.ofSeconds(duration), rate).run();
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment