Skip to content

Instantly share code, notes, and snippets.

@russelldb
Created February 14, 2011 18:28
Show Gist options
  • Save russelldb/826311 to your computer and use it in GitHub Desktop.
Save russelldb/826311 to your computer and use it in GitHub Desktop.
Shows that the limiter to concurrency is the Max Hosts Per Connection not
/**
*
*/
package brown.russell.httpclient.TestConcurrentClient;
import static org.junit.Assert.assertEquals;
import java.io.BufferedInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.PrintStream;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.Date;
import java.util.concurrent.CountDownLatch;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.apache.commons.httpclient.HostConfiguration;
import org.apache.commons.httpclient.HttpClient;
import org.apache.commons.httpclient.HttpConnectionManager;
import org.apache.commons.httpclient.MultiThreadedHttpConnectionManager;
import org.apache.commons.httpclient.methods.GetMethod;
import org.apache.commons.httpclient.params.HttpConnectionManagerParams;
import org.junit.Test;
/**
* @author russell
*
*/
public class TestConcurrentClient {
private static final Logger logger = Logger.getLogger("TestConcurrentClient");
static volatile boolean run = true;
/**
* Shows that the MaxConnectionsPerHost is the real limiter on concurrency
* for Riak client.
*
* @throws java.lang.Exception
*/
@Test public void maxConnectionPerHostLimitsConcurrency() throws Exception {
final int port = 4567;
final String url = "http://localhost:" + port;
final int clientThreads = 10;
final int maxPerHost = clientThreads - 2;
final CountDownLatch startLatch = new CountDownLatch(1);
final Server server = new Server(port);
new Thread(server).start();
assertEquals(0, server.getActive());
HttpConnectionManager httpConnectionManager = new MultiThreadedHttpConnectionManager();
HttpClient httpClient = new HttpClient(httpConnectionManager);
HttpConnectionManagerParams managerParams = httpConnectionManager.getParams();
managerParams.setMaxTotalConnections(clientThreads);
managerParams.setMaxConnectionsPerHost(HostConfiguration.ANY_HOST_CONFIGURATION, maxPerHost);
ClientRunner[] runners = new ClientRunner[clientThreads];
for (int i = 0; i < clientThreads; i++) {
runners[i] = new ClientRunner(startLatch, httpClient, url);
new Thread(runners[i]).start();
}
startLatch.countDown();
Thread.sleep(1000);
assertEquals(maxPerHost, server.getActive());
int activeCount = 0;
for (ClientRunner runner : runners) {
if (runner.isActive()) {
activeCount++;
}
}
assertEquals(maxPerHost, activeCount);
run = false;
Thread.sleep(1000);
}
private static final class ClientRunner implements Runnable {
private final CountDownLatch startLatch;
private final HttpClient httpClient;
private final GetMethod getMethod;
private volatile boolean isActive = false;
/**
* @param startLatch
* @param endLatch
* @param httpClient
*/
public ClientRunner(final CountDownLatch startLatch, final HttpClient httpClient, String url) {
this.startLatch = startLatch;
this.httpClient = httpClient;
this.getMethod = new GetMethod(url);
}
@Override public void run() {
try {
startLatch.await();
httpClient.executeMethod(getMethod);
InputStream stream = getMethod.getResponseBodyAsStream();
isActive = true;
while (run) {
stream.read(new byte[1024]);
Thread.sleep(100);
}
isActive = false;
stream.close();
getMethod.releaseConnection();
logger.info("Client released connection");
} catch (Exception e) {
e.printStackTrace();
}
}
public boolean isActive() {
return isActive;
}
}
private static final class Server implements Runnable {
private ServerSocket serverSocket;
private volatile int active = 0;
private final int port;
/**
* @param port
*/
public Server(int port) {
this.port = port;
}
public int getActive() {
return active;
}
@Override public void run() {
try {
serverSocket = new ServerSocket(port);
logger.info("Server started");
} catch (IOException e) {
logger.log(Level.SEVERE, "Could not listen on port: " + port);
}
while (run) {
Socket clientSocket = null;
try {
clientSocket = serverSocket.accept();
ServerWorker client = new ServerWorker(clientSocket);
new Thread(client).start();
active++;
} catch (IOException e) {
logger.log(Level.SEVERE, "Accept failed: " + port);
}
}
logger.info("Server shutting down");
}
}
private static final class ServerWorker implements Runnable {
public static final int HTTP_OK = 200;
private static final byte[] EOL = { (byte) '\r', (byte) '\n' };
private final Socket socket;
private ServerWorker(final Socket socket) {
this.socket = socket;
}
@Override public void run() {
try {
logger.info("New connection started");
InputStream is = new BufferedInputStream(socket.getInputStream());
PrintStream ps = new PrintStream(socket.getOutputStream());
ps.print("HTTP/1.1 " + HTTP_OK + " OK");
ps.write(EOL);
ps.print("Server: Henrich");
ps.write(EOL);
ps.print("Date: " + (new Date()));
ps.write(EOL);
// ps.print("Content-length: er...er...hm");
ps.print("Content-type: application/octet-stream");
ps.write(EOL);
ps.write(EOL);
while (run) {
ps.print("tick tock\n");
ps.flush();
Thread.sleep(500);
}
is.close();
ps.close();
socket.close();
logger.info("Connection shutting down");
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment