Created
November 3, 2022 23:05
-
-
Save ebarlas/70e38ca01abb68210d5158206401644c to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package loomtest; | |
import jdk.incubator.concurrent.StructuredTaskScope; | |
import java.io.IOException; | |
import java.net.URI; | |
import java.net.http.HttpClient; | |
import java.net.http.HttpRequest; | |
import java.net.http.HttpResponse; | |
import java.time.Duration; | |
import java.time.Instant; | |
import java.util.concurrent.ExecutionException; | |
import java.util.concurrent.Executors; | |
import java.util.concurrent.Semaphore; | |
import java.util.concurrent.TimeUnit; | |
import java.util.concurrent.TimeoutException; | |
import java.util.concurrent.atomic.AtomicLong; | |
import java.util.stream.IntStream; | |
public class FixedThroughput { | |
private final Semaphore semaphore; // limits concurrent tasks/requests/connections | |
private final AtomicLong taskCounter; // number of tasks queued | |
private final AtomicLong requestCounter; // number of requests completed | |
private final HttpClient httpClient; | |
private final HttpRequest request; | |
private final double requestsPerNano; | |
private final long startTime; | |
private final long endTime; | |
FixedThroughput(String uri, int maxConcurrency, Duration testDuration, int requestsPerSecond) { | |
semaphore = new Semaphore(maxConcurrency); | |
taskCounter = new AtomicLong(); | |
requestCounter = new AtomicLong(); | |
httpClient = HttpClient.newBuilder().executor(Executors.newVirtualThreadPerTaskExecutor()).build(); | |
request = HttpRequest.newBuilder(URI.create(uri)).GET().build(); | |
requestsPerNano = (double) requestsPerSecond / TimeUnit.SECONDS.toNanos(1); | |
startTime = System.nanoTime(); | |
endTime = startTime + testDuration.toNanos(); | |
} | |
void run() throws InterruptedException, TimeoutException, ExecutionException { | |
try (var scope = new StructuredTaskScope.ShutdownOnFailure()) { | |
scope.fork(() -> scheduleRequests(scope)); | |
scope.joinUntil(Instant.now().plusSeconds(60)); | |
scope.throwIfFailed(); | |
} | |
var d = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTime); | |
var tput = 1.0 * requestCounter.get() / d * 1000; | |
System.out.println("Test took: " + d + "ms count: " + taskCounter.get() + " responses: " + requestCounter.get() + " observed throughput: " + tput + " req/s"); | |
} | |
Void scheduleRequests(StructuredTaskScope.ShutdownOnFailure scope) { | |
while (System.nanoTime() < endTime) { | |
var elapsed = System.nanoTime() - startTime; | |
var target = requestsPerNano * elapsed; | |
var deficit = target - taskCounter.get(); | |
if (deficit > 1) { | |
var newTasks = (int) Math.floor(deficit); | |
IntStream.range(0, newTasks).forEach(n -> scope.fork(this::sendRequest)); | |
taskCounter.addAndGet(newTasks); | |
} | |
try { | |
Thread.sleep(1); | |
} catch (InterruptedException e) { | |
break; | |
} | |
} | |
return null; | |
} | |
Void sendRequest() throws InterruptedException, IOException { | |
semaphore.acquire(); // wait my turn by acquiring permit | |
try { | |
httpClient.send(request, HttpResponse.BodyHandlers.discarding()); | |
} finally { | |
semaphore.release(); // release permit, allow next in line | |
requestCounter.incrementAndGet(); | |
} | |
return null; | |
} | |
public static void main(String[] args) throws Exception { | |
var uri = "http://localhost:8080/test"; | |
var duration = 5; | |
var rate = 10_000; | |
var tests = 2; | |
var maxConcurrency = 1_000; | |
System.out.println("uri: " + uri + " duration: " + duration + " s" + " asked throughput: " + rate + " req/s" + " #tests: " + tests); | |
for (int i = 0; i < tests; ++i) { | |
new FixedThroughput(uri, maxConcurrency, Duration.ofSeconds(duration), rate).run(); | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment