Created
February 14, 2011 18:28
-
-
Save russelldb/826311 to your computer and use it in GitHub Desktop.
Shows that the limiter to concurrency is the Max Hosts Per Connection not
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/** | |
* | |
*/ | |
package brown.russell.httpclient.TestConcurrentClient; | |
import static org.junit.Assert.assertEquals; | |
import java.io.BufferedInputStream; | |
import java.io.IOException; | |
import java.io.InputStream; | |
import java.io.PrintStream; | |
import java.net.ServerSocket; | |
import java.net.Socket; | |
import java.util.Date; | |
import java.util.concurrent.CountDownLatch; | |
import java.util.logging.Level; | |
import java.util.logging.Logger; | |
import org.apache.commons.httpclient.HostConfiguration; | |
import org.apache.commons.httpclient.HttpClient; | |
import org.apache.commons.httpclient.HttpConnectionManager; | |
import org.apache.commons.httpclient.MultiThreadedHttpConnectionManager; | |
import org.apache.commons.httpclient.methods.GetMethod; | |
import org.apache.commons.httpclient.params.HttpConnectionManagerParams; | |
import org.junit.Test; | |
/** | |
* @author russell | |
* | |
*/ | |
public class TestConcurrentClient { | |
private static final Logger logger = Logger.getLogger("TestConcurrentClient"); | |
static volatile boolean run = true; | |
/** | |
* Shows that the MaxConnectionsPerHost is the real limiter on concurrency | |
* for Riak client. | |
* | |
* @throws java.lang.Exception | |
*/ | |
@Test public void maxConnectionPerHostLimitsConcurrency() throws Exception { | |
final int port = 4567; | |
final String url = "http://localhost:" + port; | |
final int clientThreads = 10; | |
final int maxPerHost = clientThreads - 2; | |
final CountDownLatch startLatch = new CountDownLatch(1); | |
final Server server = new Server(port); | |
new Thread(server).start(); | |
assertEquals(0, server.getActive()); | |
HttpConnectionManager httpConnectionManager = new MultiThreadedHttpConnectionManager(); | |
HttpClient httpClient = new HttpClient(httpConnectionManager); | |
HttpConnectionManagerParams managerParams = httpConnectionManager.getParams(); | |
managerParams.setMaxTotalConnections(clientThreads); | |
managerParams.setMaxConnectionsPerHost(HostConfiguration.ANY_HOST_CONFIGURATION, maxPerHost); | |
ClientRunner[] runners = new ClientRunner[clientThreads]; | |
for (int i = 0; i < clientThreads; i++) { | |
runners[i] = new ClientRunner(startLatch, httpClient, url); | |
new Thread(runners[i]).start(); | |
} | |
startLatch.countDown(); | |
Thread.sleep(1000); | |
assertEquals(maxPerHost, server.getActive()); | |
int activeCount = 0; | |
for (ClientRunner runner : runners) { | |
if (runner.isActive()) { | |
activeCount++; | |
} | |
} | |
assertEquals(maxPerHost, activeCount); | |
run = false; | |
Thread.sleep(1000); | |
} | |
private static final class ClientRunner implements Runnable { | |
private final CountDownLatch startLatch; | |
private final HttpClient httpClient; | |
private final GetMethod getMethod; | |
private volatile boolean isActive = false; | |
/** | |
* @param startLatch | |
* @param endLatch | |
* @param httpClient | |
*/ | |
public ClientRunner(final CountDownLatch startLatch, final HttpClient httpClient, String url) { | |
this.startLatch = startLatch; | |
this.httpClient = httpClient; | |
this.getMethod = new GetMethod(url); | |
} | |
@Override public void run() { | |
try { | |
startLatch.await(); | |
httpClient.executeMethod(getMethod); | |
InputStream stream = getMethod.getResponseBodyAsStream(); | |
isActive = true; | |
while (run) { | |
stream.read(new byte[1024]); | |
Thread.sleep(100); | |
} | |
isActive = false; | |
stream.close(); | |
getMethod.releaseConnection(); | |
logger.info("Client released connection"); | |
} catch (Exception e) { | |
e.printStackTrace(); | |
} | |
} | |
public boolean isActive() { | |
return isActive; | |
} | |
} | |
private static final class Server implements Runnable { | |
private ServerSocket serverSocket; | |
private volatile int active = 0; | |
private final int port; | |
/** | |
* @param port | |
*/ | |
public Server(int port) { | |
this.port = port; | |
} | |
public int getActive() { | |
return active; | |
} | |
@Override public void run() { | |
try { | |
serverSocket = new ServerSocket(port); | |
logger.info("Server started"); | |
} catch (IOException e) { | |
logger.log(Level.SEVERE, "Could not listen on port: " + port); | |
} | |
while (run) { | |
Socket clientSocket = null; | |
try { | |
clientSocket = serverSocket.accept(); | |
ServerWorker client = new ServerWorker(clientSocket); | |
new Thread(client).start(); | |
active++; | |
} catch (IOException e) { | |
logger.log(Level.SEVERE, "Accept failed: " + port); | |
} | |
} | |
logger.info("Server shutting down"); | |
} | |
} | |
private static final class ServerWorker implements Runnable { | |
public static final int HTTP_OK = 200; | |
private static final byte[] EOL = { (byte) '\r', (byte) '\n' }; | |
private final Socket socket; | |
private ServerWorker(final Socket socket) { | |
this.socket = socket; | |
} | |
@Override public void run() { | |
try { | |
logger.info("New connection started"); | |
InputStream is = new BufferedInputStream(socket.getInputStream()); | |
PrintStream ps = new PrintStream(socket.getOutputStream()); | |
ps.print("HTTP/1.1 " + HTTP_OK + " OK"); | |
ps.write(EOL); | |
ps.print("Server: Henrich"); | |
ps.write(EOL); | |
ps.print("Date: " + (new Date())); | |
ps.write(EOL); | |
// ps.print("Content-length: er...er...hm"); | |
ps.print("Content-type: application/octet-stream"); | |
ps.write(EOL); | |
ps.write(EOL); | |
while (run) { | |
ps.print("tick tock\n"); | |
ps.flush(); | |
Thread.sleep(500); | |
} | |
is.close(); | |
ps.close(); | |
socket.close(); | |
logger.info("Connection shutting down"); | |
} catch (Exception e) { | |
e.printStackTrace(); | |
} | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment