Skip to content

Instantly share code, notes, and snippets.

@gsson
Created December 5, 2017 16:14
Show Gist options
  • Save gsson/ac6f8ad3c37dbb4544601f4e3c3b6edf to your computer and use it in GitHub Desktop.
Save gsson/ac6f8ad3c37dbb4544601f4e3c3b6edf to your computer and use it in GitHub Desktop.
Vert.x HttpClientRequest.reset() issue
import io.vertx.core.Vertx;
import io.vertx.core.http.*;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.SynchronousQueue;
import java.util.function.Consumer;
public class Main {
static void timeoutServer(Consumer<InetSocketAddress> listenAddress) {
try {
ServerSocket serverSocket = new ServerSocket(0, 1);
List<Socket> openSockets = new ArrayList<>();
System.err.println("Accepting connections on " + serverSocket.getLocalSocketAddress());
listenAddress.accept((InetSocketAddress) serverSocket.getLocalSocketAddress());
while (true) {
Socket newSocket = serverSocket.accept();
openSockets.add(newSocket);
System.err.println("Accepted sockets: " + openSockets.size());
}
} catch (IOException e) {
}
}
static InetSocketAddress startServer() throws InterruptedException {
SynchronousQueue<InetSocketAddress> queue = new SynchronousQueue<>();
Thread server = new Thread(() -> {
timeoutServer(socketAddress -> {
try {
queue.put(socketAddress);
} catch (InterruptedException e) {
throw new IllegalStateException(e);
}
});
});
server.setDaemon(true);
server.start();
return queue.take();
}
static void makeRequest(HttpClient httpClient, int port, String host) throws InterruptedException {
HttpClientRequest clientRequest = httpClient.request(HttpMethod.GET, port, host, "/");
clientRequest.setTimeout(1000);
System.err.println("Created request " + clientRequest);
CountDownLatch done = new CountDownLatch(1);
clientRequest
.exceptionHandler(t -> {
System.err.println("Got exception " + t + " on " + clientRequest);
System.err.println("Resetting connection");
// Workaround...
//
// Not allowed to change the exception handler when the timeout exception happens, which I want to
// do to avoid getting called again
// clientRequest.exceptionHandler(t2 -> {});
// Works, but causes an error to be logged
// clientRequest.exceptionHandler(null);
//
// This will cause the exception handler to be called again with a 'VertxException'
// HttpConnection connection = clientRequest.connection();
// if (connection != null) {
// connection.close();
// }
clientRequest.reset();
done.countDown();
})
.handler(response -> {
System.err.println("Got response");
})
.endHandler(v -> {
System.err.println("Request end");
done.countDown();
})
.end();
done.await();
}
public static void main(String[] args) throws InterruptedException {
InetSocketAddress listenAddress = startServer();
Vertx vertx = Vertx.vertx();
HttpClient httpClient = vertx.createHttpClient(new HttpClientOptions()
.setMaxPoolSize(1)
.setMaxWaitQueueSize(3));
makeRequest(httpClient, listenAddress.getPort(), listenAddress.getHostString());
makeRequest(httpClient, listenAddress.getPort(), listenAddress.getHostString());
makeRequest(httpClient, listenAddress.getPort(), listenAddress.getHostString());
makeRequest(httpClient, listenAddress.getPort(), listenAddress.getHostString());
makeRequest(httpClient, listenAddress.getPort(), listenAddress.getHostString());
makeRequest(httpClient, listenAddress.getPort(), listenAddress.getHostString());
// Although all requests *should* be complete, this causes the exception handler above to be called for seemingly
// a random HttpClientRequestImpl instance
vertx.close();
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment