Skip to content

Instantly share code, notes, and snippets.

@marcogrcr
Last active March 21, 2024 17:35
Show Gist options
  • Save marcogrcr/450ddde39bf831297022da3b253c0efb to your computer and use it in GitHub Desktop.
Save marcogrcr/450ddde39bf831297022da3b253c0efb to your computer and use it in GitHub Desktop.
Java AWS SDK v2 and virtual threads

Until Java AWS SDK v2 officially supports a blocking HTTP client compatible with Virtual Threads, you can use the async clients and block on the CompletableFuture<T> using the pattern in this gist.

TODO: Get confirmation whether this is a valid approach.

import com.sun.net.httpserver.HttpServer;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.IntStream;
import software.amazon.awssdk.core.client.config.SdkAdvancedAsyncClientOption;
import software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClient;
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
public class AwsSdkV2AndVirtualThreads {
private static final int PORT = 8080;
private static final Duration REQUEST_LATENCY = Duration.ofSeconds(5);
private static final int TOTAL_REQUESTS = 1_000;
public static void main(final String[] args) throws ExecutionException, InterruptedException, IOException, URISyntaxException {
final var atomicNanoTime = new AtomicLong();
// 1. Create a local HTTP server for demo purposes
final var server = createServer(atomicNanoTime);
server.start();
System.out.printf("Server started on http://localhost:%s%n", PORT);
// 2. Create an AWS SDK client and warm it up
final var client = createClient();
System.out.println("Warming up client...");
atomicNanoTime.set(System.nanoTime());
sendRequest(client).get();
System.out.printf("Finished in: %s%n", Duration.ofNanos(System.nanoTime() - atomicNanoTime.get()));
// 3. Send the requests
sendRequests(client, atomicNanoTime);
server.stop(1);
}
private static HttpServer createServer(final AtomicLong atomicNanoTime) throws IOException {
final var firstRequest = new AtomicBoolean(true);
final var requestCount = new AtomicInteger(0);
final var responseCount = new AtomicInteger(0);
final var server = HttpServer.create(new InetSocketAddress(PORT), 1_000_000);
server.createContext("/", exchange -> {
if (requestCount.incrementAndGet() == TOTAL_REQUESTS + 1) {
System.out.printf("Received all requests in: %s%n", Duration.ofNanos(System.nanoTime() - atomicNanoTime.get()));
}
if (firstRequest.get()) {
firstRequest.set(false);
} else {
try {
Thread.sleep(REQUEST_LATENCY);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
final var response = "{\"Item\":{\"...\":{\"S\":\"...\"}}}".getBytes(StandardCharsets.UTF_8);
exchange.getResponseHeaders().add("content-type", "application/json");
exchange.sendResponseHeaders(200, response.length);
exchange.getResponseBody().write(response);
exchange.close();
if (responseCount.incrementAndGet() == TOTAL_REQUESTS + 1) {
System.out.printf("Returned all responses in: %s%n", Duration.ofNanos(System.nanoTime() - atomicNanoTime.get()));
}
});
server.setExecutor(Executors.newVirtualThreadPerTaskExecutor());
return server;
}
private static DynamoDbAsyncClient createClient() throws URISyntaxException {
return DynamoDbAsyncClient
.builder()
.endpointOverride(new URI("http://localhost:%s".formatted(PORT)))
.httpClientBuilder(
// See: https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/http-configuration-netty.html
NettyNioAsyncHttpClient
.builder()
.maxConcurrency(TOTAL_REQUESTS)
)
.asyncConfiguration(b -> b
.advancedOption(
// See: https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/asynchronous.html
SdkAdvancedAsyncClientOption.FUTURE_COMPLETION_EXECUTOR,
Executors.newVirtualThreadPerTaskExecutor()
)
)
.build();
}
private static void sendRequests(final DynamoDbAsyncClient client, final AtomicLong atomicNanoTime) {
final var executor = Executors.newVirtualThreadPerTaskExecutor();
try (client; executor) {
System.out.println("Sending requests...");
atomicNanoTime.set(System.nanoTime());
final var futures = IntStream
.range(0, TOTAL_REQUESTS)
.mapToObj((i) -> CompletableFuture.runAsync(
() -> {
try {
final var future = sendRequest(client);
/*
* This will block the current virtual thread, releasing the carrier platform thread to do other work.
* We use `.get()` instead of `.join()` so the Virtual Thread can react to interrupts.
*/
final var response = future.get();
// TODO: do something with response
} catch (ExecutionException | InterruptedException e) {
e.printStackTrace(System.out);
}
},
executor
))
.toArray(CompletableFuture[]::new);
CompletableFuture.allOf(futures).join();
System.out.printf("Finished in: %s%n", Duration.ofNanos(System.nanoTime() - atomicNanoTime.get()));
}
}
private static CompletableFuture<?> sendRequest(final DynamoDbAsyncClient client) {
/*
* This will complete asynchronously via Netty/CRT's internal threads (if any).
* In the case of Netty HTTP client, AWS SDK will use a virtual thread to process the
* continuation of the future returned by Netty.
*/
return client.getItem(r -> r.tableName("...").key(Map.of("...", AttributeValue.fromS("..."))));
}
}

Example run on a Macbook Pro (12-core M3 Pro, 36 GB) while having LOTS of programs open taking up a lot of memory. Your mileage will vary.

Server started on http://localhost:8080
Warming up client...
Finished in: PT0.67260125S
Sending requests...
Received all requests in: PT2.741794125S
Returned all responses in: PT7.871602458S
Finished in: PT8.009118625S
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment