Created
July 14, 2012 23:33
-
-
Save usmanismail/3113868 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
//======================================================================== | |
//Copyright 2012-2012 Mort Bay Consulting Pty. Ltd. | |
//------------------------------------------------------------------------ | |
//All rights reserved. This program and the accompanying materials | |
//are made available under the terms of the Eclipse Public License v1.0 | |
//and Apache License v2.0 which accompanies this distribution. | |
//The Eclipse Public License is available at | |
//http://www.eclipse.org/legal/epl-v10.html | |
//The Apache License v2.0 is available at | |
//http://www.opensource.org/licenses/apache2.0.php | |
//You may elect to redistribute this code under either of these licenses. | |
//======================================================================== | |
package org.eclipse.jetty.client; | |
import java.io.BufferedReader; | |
import java.io.IOException; | |
import java.io.InputStreamReader; | |
import java.io.OutputStream; | |
import java.net.ServerSocket; | |
import java.net.Socket; | |
import java.util.ArrayList; | |
import java.util.List; | |
import java.util.concurrent.CopyOnWriteArrayList; | |
import java.util.concurrent.ExecutorService; | |
import java.util.concurrent.Executors; | |
import java.util.concurrent.TimeUnit; | |
import java.util.concurrent.atomic.AtomicBoolean; | |
import java.util.concurrent.atomic.AtomicLong; | |
import org.eclipse.jetty.client.Address; | |
import org.eclipse.jetty.client.ContentExchange; | |
import org.eclipse.jetty.client.HttpClient; | |
import org.eclipse.jetty.client.HttpExchange; | |
import org.eclipse.jetty.util.log.Log; | |
import org.eclipse.jetty.util.log.Logger; | |
import org.testng.Assert; | |
import org.testng.annotations.Test; | |
public class ExpirationWithLimitedConnectionsTest { | |
@Test(groups = "integration") | |
public void testExpirationWithMaxConnectionPerAddressReached() | |
throws Exception { | |
final Logger logger = Log.getLogger("org.eclipse.jetty.client"); | |
logger.setDebugEnabled(true); | |
HttpClient client = new HttpClient(); | |
int maxConnectionsPerAddress = 5; | |
client.setMaxConnectionsPerAddress(maxConnectionsPerAddress); | |
long timeout = 1000; | |
client.setTimeout(timeout); | |
client.start(); | |
final List<Socket> sockets = new CopyOnWriteArrayList<Socket>(); | |
final List<Exception> failures = new CopyOnWriteArrayList<Exception>(); | |
final AtomicLong processingDelay = new AtomicLong(3000); | |
final ExecutorService threadPool = Executors.newCachedThreadPool(); | |
final ServerSocket server = new ServerSocket(0); | |
threadPool.submit(new Runnable() { | |
public void run() { | |
while (true) { | |
try { | |
final Socket socket = server.accept(); | |
sockets.add(socket); | |
logger.debug("CONNECTION {}", | |
socket.getRemoteSocketAddress()); | |
threadPool.submit(new Runnable() { | |
public void run() { | |
while (true) { | |
try { | |
BufferedReader reader = new BufferedReader( | |
new InputStreamReader(socket | |
.getInputStream(), | |
"UTF-8")); | |
String firstLine = reader.readLine(); | |
String line = firstLine; | |
while (line != null) { | |
if (line.length() == 0) | |
break; | |
line = reader.readLine(); | |
} | |
if (line == null) | |
break; | |
long sleep = processingDelay.get(); | |
logger.debug( | |
"{} {} {} ms", | |
firstLine, | |
socket.getRemoteSocketAddress(), | |
sleep); | |
TimeUnit.MILLISECONDS.sleep(sleep); | |
String response = "" | |
+ "HTTP/1.1 200 OK\r\n" | |
+ "Content-Length: 0\r\n" | |
+ "\r\n"; | |
OutputStream output = socket | |
.getOutputStream(); | |
output.write(response.getBytes("UTF-8")); | |
output.flush(); | |
} catch (Exception x) { | |
failures.add(x); | |
break; | |
} | |
} | |
} | |
}); | |
} catch (Exception x) { | |
failures.add(x); | |
break; | |
} | |
} | |
} | |
}); | |
final AtomicBoolean firstExpired = new AtomicBoolean(); | |
int count = 0; | |
int maxAdditionalRequest = 100; | |
int additionalRequests = 0; | |
for(int i=0; i < 1000; i++) { | |
TimeUnit.MILLISECONDS.sleep(1); // Just avoid being too fast | |
ContentExchange exchange = new ContentExchange(true) { | |
@Override | |
protected void onResponseComplete() throws IOException { | |
//logger.debug("{} {} OK", getMethod(), getRequestURI()); | |
} | |
@Override | |
protected void onExpire() { | |
//logger.debug("{} {} EXPIRED {}", getMethod(), | |
// getRequestURI(), this); | |
//firstExpired.compareAndSet(false, true); | |
} | |
}; | |
Address address = new Address("localhost", server.getLocalPort()); | |
exchange.setAddress(address); | |
exchange.setMethod("GET"); | |
exchange.setRequestURI("/" + count); | |
exchange.setVersion("HTTP/1.1"); | |
exchange.setRequestHeader("Host", address.toString()); | |
logger.debug("{} {} SENT", exchange.getMethod(), | |
exchange.getRequestURI()); | |
client.send(exchange); | |
++count; | |
} | |
//Send One more | |
ContentExchange exchange = new ContentExchange(true) { | |
@Override | |
protected void onResponseComplete() throws IOException { | |
//logger.debug("{} {} OK", getMethod(), getRequestURI()); | |
} | |
@Override | |
protected void onExpire() { | |
//logger.debug("{} {} EXPIRED {}", getMethod(), | |
// getRequestURI(), this); | |
//firstExpired.compareAndSet(false, true); | |
} | |
}; | |
Address address = new Address("localhost", server.getLocalPort()); | |
exchange.setAddress(address); | |
exchange.setMethod("GET"); | |
exchange.setRequestURI("/" + count); | |
exchange.setVersion("HTTP/1.1"); | |
exchange.setRequestHeader("Host", address.toString()); | |
logger.debug("{} {} SENT", exchange.getMethod(), | |
exchange.getRequestURI()); | |
//Allow enough time for any current connections to expire | |
Thread.sleep(30000); | |
//Send one more connection this one should not time out | |
client.send(exchange); | |
exchange.waitForDone(); | |
Assert.assertTrue(exchange.getStatus() != HttpExchange.STATUS_EXPIRED); | |
client.stop(); | |
Assert.assertTrue(failures.isEmpty()); | |
for (Socket socket : sockets) | |
socket.close(); | |
server.close(); | |
threadPool.shutdown(); | |
threadPool.awaitTermination(5, TimeUnit.SECONDS); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment