Created
May 24, 2011 21:01
-
-
Save rschildmeijer/989674 to your computer and use it in GitHub Desktop.
multiprocessing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
diff --git src/main/java/org/deftserver/example/DeftServerExample.java src/main/java/org/deftserver/example/DeftServerExample.java | |
index ba3f422..1330d3c 100644 | |
--- src/main/java/org/deftserver/example/DeftServerExample.java | |
+++ src/main/java/org/deftserver/example/DeftServerExample.java | |
@@ -2,13 +2,11 @@ | |
import java.util.Map; | |
-import org.deftserver.io.IOLoop; | |
import org.deftserver.web.Application; | |
import org.deftserver.web.HttpServer; | |
import org.deftserver.web.handler.RequestHandler; | |
import org.deftserver.web.http.HttpRequest; | |
import org.deftserver.web.http.HttpResponse; | |
-import org.deftserver.web.http.HttpServerDescriptor; | |
import org.slf4j.Logger; | |
import org.slf4j.LoggerFactory; | |
@@ -42,14 +40,16 @@ | |
Application application = new Application(handlers); | |
application.setStaticContentDir("static"); | |
- HttpServerDescriptor.KEEP_ALIVE_TIMEOUT = 30 * 1000; // 30s | |
- HttpServerDescriptor.READ_BUFFER_SIZE = 1500; // 1500 bytes | |
- HttpServerDescriptor.WRITE_BUFFER_SIZE = 1500; // 1500 bytes | |
+// HttpServerDescriptor.KEEP_ALIVE_TIMEOUT = 30 * 1000; // 30s | |
+// HttpServerDescriptor.READ_BUFFER_SIZE = 1500; // 1500 bytes | |
+// HttpServerDescriptor.WRITE_BUFFER_SIZE = 1500; // 1500 bytes | |
logger.debug("Starting up server on port: " + PORT); | |
HttpServer server = new HttpServer(application); | |
- server.listen(PORT); | |
- IOLoop.INSTANCE.start(); | |
+ //server.listen(PORT); | |
+ server.bind(PORT); | |
+ server.start(8); | |
+ //IOLoop.INSTANCE.start(); | |
} | |
} | |
diff --git src/main/java/org/deftserver/io/AsynchronousSocket.java src/main/java/org/deftserver/io/AsynchronousSocket.java | |
index 4969ff0..3d957ff 100644 | |
--- src/main/java/org/deftserver/io/AsynchronousSocket.java | |
+++ src/main/java/org/deftserver/io/AsynchronousSocket.java | |
@@ -23,6 +23,8 @@ | |
private static final Logger logger = LoggerFactory.getLogger(AsynchronousSocket.class); | |
+ private final IOLoop ioLoop; | |
+ | |
private final int DEFAULT_BYTEBUFFER_SIZE = 1024; | |
private final AsyncResult<String> nopAsyncStringResult = NopAsyncResult.of(String.class).nopAsyncResult; | |
@@ -48,19 +50,24 @@ | |
* Create a new {@code AsynchronousSocket} that will delegate its io operations the given {@code SelectableChannel}. | |
*/ | |
public AsynchronousSocket(SelectableChannel channel) { | |
+ this(IOLoop.INSTANCE, channel); | |
+ } | |
+ | |
+ public AsynchronousSocket(IOLoop ioLoop, SelectableChannel channel) { | |
+ this.ioLoop = ioLoop; | |
this.channel = channel; | |
interestOps = SelectionKey.OP_CONNECT; | |
if (channel instanceof SocketChannel && (((SocketChannel) channel).isConnected())) { | |
interestOps |= SelectionKey.OP_READ; | |
} | |
- IOLoop.INSTANCE.addHandler(channel, this, interestOps, null); | |
+ ioLoop.addHandler(channel, this, interestOps, null); | |
} | |
/** | |
* Connects to the given host port tuple and invokes the given callback when a successful connection is established. | |
*/ | |
public void connect(String host, int port, AsyncResult<Boolean> ccb) { | |
- IOLoop.INSTANCE.updateHandler(channel, interestOps |= SelectionKey.OP_CONNECT); | |
+ ioLoop.updateHandler(channel, interestOps |= SelectionKey.OP_CONNECT); | |
connectCallback = ccb; | |
if (channel instanceof SocketChannel) { | |
try { | |
@@ -79,7 +86,7 @@ | |
* Close the socket. | |
*/ | |
public void close() { | |
- Closeables.closeQuietly(channel); | |
+ Closeables.closeQuietly(ioLoop, channel); | |
invokeCloseCallback(); | |
} | |
@@ -110,7 +117,7 @@ | |
sc.finishConnect(); | |
invokeConnectSuccessfulCallback(); | |
interestOps &= ~SelectionKey.OP_CONNECT; | |
- IOLoop.INSTANCE.updateHandler(channel, interestOps |= SelectionKey.OP_READ); | |
+ ioLoop.updateHandler(channel, interestOps |= SelectionKey.OP_READ); | |
} catch (ConnectException e) { | |
logger.warn("Connect failed: {}", e.getMessage()); | |
invokeConnectFailureCallback(e); | |
@@ -128,7 +135,7 @@ | |
int read = ((SocketChannel) key.channel()).read(buffer); | |
if (read == -1) { // EOF | |
reachedEOF = true; | |
- IOLoop.INSTANCE.updateHandler(channel, interestOps &= ~SelectionKey.OP_READ); | |
+ ioLoop.updateHandler(channel, interestOps &= ~SelectionKey.OP_READ); | |
return; | |
} | |
readBuffer.append(new String(buffer.array(), 0, buffer.position(), Charsets.ISO_8859_1)); | |
@@ -253,15 +260,15 @@ | |
} catch (IOException e) { | |
logger.error("IOException during write: {}", e.getMessage()); | |
invokeCloseCallback(); | |
- Closeables.closeQuietly(channel); | |
+ Closeables.closeQuietly(ioLoop, channel); | |
} | |
writeBuffer.delete(0, written); | |
logger.debug("wrote: {} bytes", written); | |
logger.debug("writeBuffer size: {}", writeBuffer.length()); | |
if (writeBuffer.length() > 0) { | |
- IOLoop.INSTANCE.updateHandler(channel, interestOps |= SelectionKey.OP_WRITE); | |
+ ioLoop.updateHandler(channel, interestOps |= SelectionKey.OP_WRITE); | |
} else { | |
- IOLoop.INSTANCE.updateHandler(channel, interestOps &= ~SelectionKey.OP_WRITE); | |
+ ioLoop.updateHandler(channel, interestOps &= ~SelectionKey.OP_WRITE); | |
invokeWriteCallback(); | |
} | |
} | |
diff --git src/main/java/org/deftserver/io/IOLoop.java src/main/java/org/deftserver/io/IOLoop.java | |
index 869904c..89db7ca 100644 | |
--- src/main/java/org/deftserver/io/IOLoop.java | |
+++ src/main/java/org/deftserver/io/IOLoop.java | |
@@ -12,6 +12,7 @@ | |
import java.util.Iterator; | |
import java.util.List; | |
import java.util.Map; | |
+import java.util.concurrent.atomic.AtomicInteger; | |
import org.deftserver.io.callback.CallbackManager; | |
import org.deftserver.io.callback.JMXDebuggableCallbackManager; | |
@@ -27,9 +28,9 @@ | |
import com.google.common.collect.Lists; | |
import com.google.common.collect.Maps; | |
-public enum IOLoop implements IOLoopMXBean { | |
+public class IOLoop implements IOLoopMXBean { | |
- INSTANCE; | |
+ public static final IOLoop INSTANCE = new IOLoop(); | |
private boolean running = false; | |
@@ -41,8 +42,10 @@ | |
private final TimeoutManager tm = new JMXDebuggableTimeoutManager(); | |
private final CallbackManager cm = new JMXDebuggableCallbackManager(); | |
+ | |
+ private static final AtomicInteger sequence = new AtomicInteger(); | |
- private IOLoop() { | |
+ public IOLoop() { | |
try { | |
selector = Selector.open(); | |
} catch (IOException e) { | |
@@ -55,7 +58,8 @@ | |
* and will be the io loop thread. | |
*/ | |
public void start() { | |
- Thread.currentThread().setName("I/O-LOOP"); | |
+ Thread.currentThread().setName("I/O-LOOP" + sequence.incrementAndGet()); | |
+ logger.debug("starting ioloop"); | |
running = true; | |
long selectorTimeout = 250; // 250 ms | |
diff --git src/main/java/org/deftserver/io/callback/PeriodicCallback.java src/main/java/org/deftserver/io/callback/PeriodicCallback.java | |
index 01b102f..a5bd41c 100644 | |
--- src/main/java/org/deftserver/io/callback/PeriodicCallback.java | |
+++ src/main/java/org/deftserver/io/callback/PeriodicCallback.java | |
@@ -6,6 +6,7 @@ | |
public class PeriodicCallback { | |
+ private final IOLoop ioLoop; | |
private final AsyncCallback cb; | |
private final long period; | |
private boolean active = true; | |
@@ -16,6 +17,11 @@ | |
* @param period The period in ms | |
*/ | |
public PeriodicCallback(AsyncCallback cb, long period) { | |
+ this(IOLoop.INSTANCE, cb, period); | |
+ } | |
+ | |
+ public PeriodicCallback(IOLoop ioLoop, AsyncCallback cb, long period) { | |
+ this.ioLoop = ioLoop; | |
this.cb = cb; | |
this.period = period; | |
} | |
@@ -24,7 +30,7 @@ | |
* Start the {@code PeriodicCallback} | |
*/ | |
public void start() { | |
- IOLoop.INSTANCE.addTimeout( | |
+ ioLoop.addTimeout( | |
new Timeout( | |
System.currentTimeMillis() + period, | |
new AsyncCallback() { @Override public void onCallback() { run(); }} | |
diff --git src/main/java/org/deftserver/io/timeout/Timeout.java src/main/java/org/deftserver/io/timeout/Timeout.java | |
index 216f286..4d005db 100644 | |
--- src/main/java/org/deftserver/io/timeout/Timeout.java | |
+++ src/main/java/org/deftserver/io/timeout/Timeout.java | |
@@ -2,6 +2,7 @@ | |
import java.nio.channels.SelectableChannel; | |
+import org.deftserver.io.IOLoop; | |
import org.deftserver.util.Closeables; | |
import org.deftserver.web.AsyncCallback; | |
@@ -33,10 +34,10 @@ | |
return cancelled ? AsyncCallback.nopCb : cb; | |
} | |
- public static Timeout newKeepAliveTimeout(final SelectableChannel clientChannel, long keepAliveTimeout) { | |
+ public static Timeout newKeepAliveTimeout(final IOLoop ioLoop, final SelectableChannel clientChannel, long keepAliveTimeout) { | |
return new Timeout( | |
System.currentTimeMillis() + keepAliveTimeout, | |
- new AsyncCallback() { public void onCallback() { Closeables.closeQuietly(clientChannel); } } | |
+ new AsyncCallback() { public void onCallback() { Closeables.closeQuietly(ioLoop, clientChannel); } } | |
); | |
} | |
diff --git src/main/java/org/deftserver/util/Closeables.java src/main/java/org/deftserver/util/Closeables.java | |
index d71faf2..7e96553 100644 | |
--- src/main/java/org/deftserver/util/Closeables.java | |
+++ src/main/java/org/deftserver/util/Closeables.java | |
@@ -10,10 +10,14 @@ | |
private Closeables() {} | |
public static void closeQuietly(SelectableChannel channel) { | |
+ closeQuietly(IOLoop.INSTANCE, channel); | |
+ } | |
+ | |
+ public static void closeQuietly(IOLoop ioLoop, SelectableChannel channel) { | |
try { | |
- IOLoop.INSTANCE.removeHandler(channel); | |
+ ioLoop.removeHandler(channel); | |
com.google.common.io.Closeables.close(channel, true); | |
} catch (IOException ignore) { } | |
} | |
- | |
+ | |
} | |
diff --git src/main/java/org/deftserver/util/HttpUtil.java src/main/java/org/deftserver/util/HttpUtil.java | |
index effb482..11c874d 100644 | |
--- src/main/java/org/deftserver/util/HttpUtil.java | |
+++ src/main/java/org/deftserver/util/HttpUtil.java | |
@@ -1,7 +1,6 @@ | |
package org.deftserver.util; | |
import java.io.File; | |
-import java.math.BigInteger; | |
import java.security.MessageDigest; | |
import java.security.NoSuchAlgorithmException; | |
@@ -158,9 +157,10 @@ | |
public static String getEtag(byte[] bytes) { | |
- byte[] digest = md.digest(bytes); | |
- BigInteger number = new BigInteger(1, digest); | |
- return '0' + number.toString(16); // prepend a '0' to get a proper MD5 hash | |
+// byte[] digest = md.digest(bytes); | |
+// BigInteger number = new BigInteger(1, digest); | |
+// return '0' + number.toString(16); // prepend a '0' to get a proper MD5 hash | |
+ return "etag"; | |
} | |
diff --git src/main/java/org/deftserver/web/HttpServer.java src/main/java/org/deftserver/web/HttpServer.java | |
index b4c0b08..7116bc8 100644 | |
--- src/main/java/org/deftserver/web/HttpServer.java | |
+++ src/main/java/org/deftserver/web/HttpServer.java | |
@@ -4,13 +4,15 @@ | |
import java.net.InetSocketAddress; | |
import java.nio.channels.SelectionKey; | |
import java.nio.channels.ServerSocketChannel; | |
+import java.util.List; | |
-import org.deftserver.io.IOHandler; | |
import org.deftserver.io.IOLoop; | |
import org.deftserver.util.Closeables; | |
import org.deftserver.web.http.HttpProtocol; | |
import org.slf4j.Logger; | |
import org.slf4j.LoggerFactory; | |
+ | |
+import com.google.common.collect.Lists; | |
public class HttpServer { | |
@@ -20,17 +22,28 @@ | |
private static final int MAX_PORT_NUMBER = 65535; | |
private ServerSocketChannel serverChannel; | |
+ private final List<IOLoop> ioLoops = Lists.newLinkedList(); | |
+ | |
+ private final Application application; | |
- private final IOHandler protocol; | |
- | |
public HttpServer(Application application) { | |
- protocol = new HttpProtocol(application); | |
+ this.application = application; | |
} | |
/** | |
+ * If you want to run Deft on multiple threads first invoke {@link #bind(int)} then {@link #start(int)} | |
+ * instead of {@link #listen(int)} (listen starts Deft http server on a single thread with the default IOLoop | |
+ * instance: {@code IOLoop.INSTANCE}). | |
+ * | |
* @return this for chaining purposes | |
*/ | |
public void listen(int port) { | |
+ bind(port); | |
+ ioLoops.add(IOLoop.INSTANCE); | |
+ registerHandler(IOLoop.INSTANCE, new HttpProtocol(application)); | |
+ } | |
+ | |
+ public void bind(int port) { | |
if (port <= MIN_PORT_NUMBER || port > MAX_PORT_NUMBER) { | |
throw new IllegalArgumentException("Invalid port number. Valid range: [" + | |
MIN_PORT_NUMBER + ", " + MAX_PORT_NUMBER + ")"); | |
@@ -48,8 +61,22 @@ | |
serverChannel.socket().bind(endpoint); | |
} catch (IOException e) { | |
logger.error("Could not bind socket: {}", e); | |
+ } | |
+ } | |
+ | |
+ public void start(int numThreads) { | |
+ for (int i = 0; i < numThreads; i++) { | |
+ final IOLoop ioLoop = new IOLoop(); | |
+ ioLoops.add(ioLoop); | |
+ final HttpProtocol protocol = new HttpProtocol(ioLoop, application); | |
+ new Thread(new Runnable() { | |
+ | |
+ @Override public void run() { | |
+ registerHandler(ioLoop, protocol); | |
+ ioLoop.start(); | |
+ } | |
+ }).start(); | |
} | |
- registerHandler(); | |
} | |
/** | |
@@ -57,11 +84,13 @@ | |
*/ | |
public void stop() { | |
logger.debug("Stopping HTTP server"); | |
- Closeables.closeQuietly(serverChannel); | |
+ for (IOLoop ioLoop : ioLoops) { | |
+ Closeables.closeQuietly(ioLoop, serverChannel); | |
+ } | |
} | |
- private void registerHandler() { | |
- IOLoop.INSTANCE.addHandler( | |
+ private void registerHandler(IOLoop ioLoop, HttpProtocol protocol) { | |
+ ioLoop.addHandler( | |
serverChannel, | |
protocol, | |
SelectionKey.OP_ACCEPT, | |
diff --git src/main/java/org/deftserver/web/http/HttpProtocol.java src/main/java/org/deftserver/web/http/HttpProtocol.java | |
index 1ba187a..b5c4dfc 100644 | |
--- src/main/java/org/deftserver/web/http/HttpProtocol.java | |
+++ src/main/java/org/deftserver/web/http/HttpProtocol.java | |
@@ -28,12 +28,19 @@ | |
private final static Logger logger = LoggerFactory.getLogger(HttpProtocol.class); | |
+ private final IOLoop ioLoop; | |
private final Application application; | |
+ | |
// a queue of half-baked (pending/unfinished) HTTP post request | |
private final Map<SelectableChannel, PartialHttpRequest> partials = Maps.newHashMap(); | |
public HttpProtocol(Application app) { | |
+ this(IOLoop.INSTANCE, app); | |
+ } | |
+ | |
+ public HttpProtocol(IOLoop ioLoop, Application app) { | |
+ this.ioLoop = ioLoop; | |
application = app; | |
} | |
@@ -41,8 +48,11 @@ | |
public void handleAccept(SelectionKey key) throws IOException { | |
logger.debug("handle accept..."); | |
SocketChannel clientChannel = ((ServerSocketChannel) key.channel()).accept(); | |
- clientChannel.configureBlocking(false); | |
- IOLoop.INSTANCE.addHandler(clientChannel, this, SelectionKey.OP_READ, ByteBuffer.allocate(READ_BUFFER_SIZE)); | |
+ if (clientChannel != null) { | |
+ // could be null in a multithreaded deft environment because another ioloop was "faster" to accept() | |
+ clientChannel.configureBlocking(false); | |
+ ioLoop.addHandler(clientChannel, this, SelectionKey.OP_READ, ByteBuffer.allocate(READ_BUFFER_SIZE)); | |
+ } | |
} | |
@Override | |
@@ -57,9 +67,9 @@ | |
HttpRequest request = getHttpRequest(key, clientChannel); | |
if (request.isKeepAlive()) { | |
- IOLoop.INSTANCE.addKeepAliveTimeout( | |
+ ioLoop.addKeepAliveTimeout( | |
clientChannel, | |
- Timeout.newKeepAliveTimeout(clientChannel, KEEP_ALIVE_TIMEOUT) | |
+ Timeout.newKeepAliveTimeout(ioLoop, clientChannel, KEEP_ALIVE_TIMEOUT) | |
); | |
} | |
HttpResponse response = new HttpResponse(this, key, request.isKeepAlive()); | |
@@ -82,7 +92,7 @@ | |
try { | |
toSend.flip(); // prepare for write | |
long bytesWritten = channel.write(toSend); | |
- if (IOLoop.INSTANCE.hasKeepAliveTimeout(channel)) { | |
+ if (ioLoop.hasKeepAliveTimeout(channel)) { | |
prolongKeepAliveTimeout(channel); | |
} | |
logger.debug("sent {} bytes to wire", bytesWritten); | |
@@ -94,32 +104,36 @@ | |
} | |
} catch (IOException e) { | |
logger.error("Failed to send data to client: {}", e.getMessage()); | |
- Closeables.closeQuietly(channel); | |
+ Closeables.closeQuietly(ioLoop, channel); | |
} | |
} | |
public void closeOrRegisterForRead(SelectionKey key) { | |
- if (key.isValid() && IOLoop.INSTANCE.hasKeepAliveTimeout(key.channel())) { | |
+ if (key.isValid() && ioLoop.hasKeepAliveTimeout(key.channel())) { | |
try { | |
key.channel().register(key.selector(), SelectionKey.OP_READ, reuseAttachment(key)); | |
prolongKeepAliveTimeout(key.channel()); | |
logger.debug("keep-alive connection. registrating for read."); | |
} catch (ClosedChannelException e) { | |
logger.debug("ClosedChannelException while registrating key for read: {}", e.getMessage()); | |
- Closeables.closeQuietly(key.channel()); | |
+ Closeables.closeQuietly(ioLoop, key.channel()); | |
} | |
} else { | |
// http request should be finished and no 'keep-alive' => close connection | |
logger.debug("Closing finished (non keep-alive) http connection"); | |
- Closeables.closeQuietly(key.channel()); | |
+ Closeables.closeQuietly(ioLoop, key.channel()); | |
} | |
} | |
public void prolongKeepAliveTimeout(SelectableChannel channel) { | |
- IOLoop.INSTANCE.addKeepAliveTimeout( | |
+ ioLoop.addKeepAliveTimeout( | |
channel, | |
- Timeout.newKeepAliveTimeout(channel, KEEP_ALIVE_TIMEOUT) | |
+ Timeout.newKeepAliveTimeout(ioLoop, channel, KEEP_ALIVE_TIMEOUT) | |
); | |
+ } | |
+ | |
+ public IOLoop getIOLoop() { | |
+ return ioLoop; | |
} | |
/** | |
@@ -148,7 +162,7 @@ | |
clientChannel.read(buffer); | |
} catch (IOException e) { | |
logger.warn("Could not read buffer: {}", e.getMessage()); | |
- Closeables.closeQuietly(clientChannel); | |
+ Closeables.closeQuietly(ioLoop, clientChannel); | |
} | |
buffer.flip(); | |
diff --git src/main/java/org/deftserver/web/http/HttpRequest.java src/main/java/org/deftserver/web/http/HttpRequest.java | |
index af6a546..f5849dc 100644 | |
--- src/main/java/org/deftserver/web/http/HttpRequest.java | |
+++ src/main/java/org/deftserver/web/http/HttpRequest.java | |
@@ -8,6 +8,7 @@ | |
import java.util.Map; | |
import java.util.regex.Pattern; | |
+import org.deftserver.io.IOLoop; | |
import org.deftserver.util.ArrayUtil; | |
import org.deftserver.web.HttpVerb; | |
@@ -15,6 +16,8 @@ | |
import com.google.common.collect.ImmutableMultimap; | |
public class HttpRequest { | |
+ | |
+ private IOLoop ioLoop; | |
private final String requestLine; | |
private final HttpVerb method; | |
@@ -117,6 +120,14 @@ | |
return new HttpRequest(unfinished.getRequestLine(), unfinished.getHeaders(), unfinished.getBody()); | |
} | |
} | |
+ | |
+ protected void setIOLoop(IOLoop ioLoop) { | |
+ this.ioLoop = ioLoop; | |
+ } | |
+ | |
+ public IOLoop getIOLoop() { | |
+ return ioLoop; | |
+ } | |
public String getRequestLine() { | |
return requestLine; | |
diff --git src/main/java/org/deftserver/web/http/HttpResponse.java src/main/java/org/deftserver/web/http/HttpResponse.java | |
index af569d8..5bac6e9 100644 | |
--- src/main/java/org/deftserver/web/http/HttpResponse.java | |
+++ src/main/java/org/deftserver/web/http/HttpResponse.java | |
@@ -11,7 +11,6 @@ | |
import java.util.HashMap; | |
import java.util.Map; | |
-import org.deftserver.io.IOLoop; | |
import org.deftserver.io.buffer.DynamicByteBuffer; | |
import org.deftserver.util.Closeables; | |
import org.deftserver.util.DateUtil; | |
@@ -79,10 +78,10 @@ | |
channel.write(responseData.getByteBuffer()); | |
} catch (IOException e) { | |
logger.error("ClosedChannelException during channel.write(): {}", e.getMessage()); | |
- Closeables.closeQuietly(key.channel()); | |
+ Closeables.closeQuietly(protocol.getIOLoop(), key.channel()); | |
} | |
long bytesFlushed = responseData.position(); | |
- if (IOLoop.INSTANCE.hasKeepAliveTimeout(channel)) { | |
+ if (protocol.getIOLoop().hasKeepAliveTimeout(channel)) { | |
protocol.prolongKeepAliveTimeout(channel); | |
} | |
if (responseData.hasRemaining()) { | |
@@ -91,7 +90,7 @@ | |
key.channel().register(key.selector(), SelectionKey.OP_WRITE); | |
} catch (ClosedChannelException e) { | |
logger.error("ClosedChannelException during flush(): {}", e.getMessage()); | |
- Closeables.closeQuietly(key.channel()); | |
+ Closeables.closeQuietly(protocol.getIOLoop(), key.channel()); | |
} | |
key.attach(responseData); | |
} else { | |
diff --git src/main/java/org/deftserver/web/http/client/AsynchronousHttpClient.java src/main/java/org/deftserver/web/http/client/AsynchronousHttpClient.java | |
index 22432d1..f8cb32e 100644 | |
--- src/main/java/org/deftserver/web/http/client/AsynchronousHttpClient.java | |
+++ src/main/java/org/deftserver/web/http/client/AsynchronousHttpClient.java | |
@@ -48,11 +48,19 @@ | |
private Timeout timeout; | |
+ private final IOLoop ioLoop; | |
+ | |
private static final String HTTP_VERSION = "HTTP/1.1\r\n"; | |
private static final String USER_AGENT_HEADER = "User-Agent: Deft AsynchronousHttpClient/0.2-SNAPSHOT\r\n"; | |
private static final String NEWLINE = "\r\n"; | |
- public AsynchronousHttpClient() { } | |
+ public AsynchronousHttpClient() { | |
+ this(IOLoop.INSTANCE); | |
+ } | |
+ | |
+ public AsynchronousHttpClient(IOLoop ioLoop) { | |
+ this.ioLoop = ioLoop; | |
+ } | |
/** | |
* Makes an asynchronous HTTP GET request against the specified url and invokes the given | |
@@ -106,7 +114,7 @@ | |
System.currentTimeMillis() + TIMEOUT, | |
new AsyncCallback() { public void onCallback() { onTimeout(); } } | |
); | |
- IOLoop.INSTANCE.addTimeout(timeout); | |
+ ioLoop.addTimeout(timeout); | |
} | |
private void cancelTimeout() { | |
diff --git src/test/java/org/deftserver/web/DeftSystemTest.java src/test/java/org/deftserver/web/DeftSystemTest.java | |
index 921e315..c8abd0f 100644 | |
--- src/test/java/org/deftserver/web/DeftSystemTest.java | |
+++ src/test/java/org/deftserver/web/DeftSystemTest.java | |
@@ -1078,7 +1078,7 @@ | |
IOLoop.INSTANCE.addCallback(new AsyncCallback() { public void onCallback() { server.listen(PORT+1); }}); | |
IOLoop.INSTANCE.addCallback(new AsyncCallback() { public void onCallback() { server.stop(); latch.countDown(); }}); | |
} | |
- latch.await(5, TimeUnit.SECONDS); | |
+ latch.await(50, TimeUnit.SECONDS); | |
assertEquals(0, latch.getCount()); | |
} | |
@@ -1152,7 +1152,7 @@ | |
}; | |
// make sure that the http.fetch(..) is invoked from the ioloop thread | |
IOLoop.INSTANCE.addCallback(new AsyncCallback() { public void onCallback() { http.fetch(url, cb); }}); | |
- latch.await(5, TimeUnit.SECONDS); | |
+ latch.await(15, TimeUnit.SECONDS); | |
assertEquals(0, latch.getCount()); | |
assertEquals("hello test", result[0]); | |
assertEquals("HTTP/1.1 200 OK", result[1]); |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment