Created
March 5, 2011 17:58
-
-
Save rschildmeijer/856549 to your computer and use it in GitHub Desktop.
partial #102 (thread safe jmx)
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
diff --git src/main/java/org/deftserver/io/IOLoop.java src/main/java/org/deftserver/io/IOLoop.java | |
index 84aa5a3..fa978a0 100644 | |
--- src/main/java/org/deftserver/io/IOLoop.java | |
+++ src/main/java/org/deftserver/io/IOLoop.java | |
@@ -9,10 +9,11 @@ | |
import java.nio.channels.Selector; | |
import java.nio.channels.SocketChannel; | |
import java.util.Collection; | |
-import java.util.HashMap; | |
import java.util.Iterator; | |
import java.util.List; | |
import java.util.Map; | |
+import java.util.concurrent.CountDownLatch; | |
+import java.util.concurrent.TimeUnit; | |
import org.deftserver.io.callback.CallbackManager; | |
import org.deftserver.io.callback.JMXDebuggableCallbackManager; | |
@@ -153,13 +154,34 @@ | |
// implements IOLoopMXBean | |
@Override | |
public int getNumberOfRegisteredIOHandlers() { | |
- return handlers.size(); | |
+ final CountDownLatch latch = new CountDownLatch(1); | |
+ final int[] rv = {0}; | |
+ IOLoop.INSTANCE.addCallback(new AsyncCallback() { public void onCallback() { | |
+ rv[0] = handlers.size(); | |
+ latch.countDown(); | |
+ }}); | |
+ try { | |
+ latch.await(5, TimeUnit.SECONDS); | |
+ } catch (InterruptedException e) { | |
+ logger.warn("JMX call interrupted while waiting for IOLoop callback."); | |
+ } | |
+ return rv[0]; | |
} | |
@Override | |
public List<String> getRegisteredIOHandlers() { | |
- Map<SelectableChannel, IOHandler> defensive = new HashMap<SelectableChannel, IOHandler>(handlers); | |
- Collection<String> readables = transform(defensive.values(), new Function<IOHandler, String>() { | |
+ final CountDownLatch latch = new CountDownLatch(1); | |
+ final Map<SelectableChannel, IOHandler> copy = Maps.newHashMap(); | |
+ IOLoop.INSTANCE.addCallback(new AsyncCallback() { public void onCallback() { | |
+ copy.putAll(handlers); | |
+ latch.countDown(); | |
+ }}); | |
+ try { | |
+ latch.await(5, TimeUnit.SECONDS); | |
+ } catch (InterruptedException e) { | |
+ logger.warn("JMX call interrupted while waiting for IOLoop callback."); | |
+ } | |
+ Collection<String> readables = transform(copy.values(), new Function<IOHandler, String>() { | |
@Override public String apply(IOHandler handler) { return handler.toString(); } | |
}); | |
return Lists.newLinkedList(readables); | |
diff --git src/test/java/org/deftserver/web/DeftSystemTest.java src/test/java/org/deftserver/web/DeftSystemTest.java | |
index d43ab3e..17be341 100644 | |
--- src/test/java/org/deftserver/web/DeftSystemTest.java | |
+++ src/test/java/org/deftserver/web/DeftSystemTest.java | |
@@ -11,6 +11,7 @@ | |
import java.io.InputStreamReader; | |
import java.net.InetSocketAddress; | |
import java.nio.ByteBuffer; | |
+import java.nio.channels.SelectableChannel; | |
import java.nio.channels.SocketChannel; | |
import java.util.Arrays; | |
import java.util.HashMap; | |
@@ -20,6 +21,7 @@ | |
import java.util.concurrent.CountDownLatch; | |
import java.util.concurrent.ExecutorService; | |
import java.util.concurrent.Executors; | |
+import java.util.concurrent.ScheduledExecutorService; | |
import java.util.concurrent.TimeUnit; | |
import org.apache.http.Header; | |
@@ -39,6 +41,7 @@ | |
import org.apache.http.params.HttpProtocolParams; | |
import org.deftserver.example.kv.KeyValueStore; | |
import org.deftserver.example.kv.KeyValueStoreClient; | |
+import org.deftserver.io.AsynchronousSocket; | |
import org.deftserver.io.IOLoop; | |
import org.deftserver.io.timeout.Timeout; | |
import org.deftserver.web.handler.RequestHandler; | |
@@ -1046,9 +1049,54 @@ | |
for (int i = 0; i < n; i++) { | |
IOLoop.INSTANCE.addCallback(new AsyncCallback() { public void onCallback() { server.listen(PORT+1); }}); | |
IOLoop.INSTANCE.addCallback(new AsyncCallback() { public void onCallback() { server.stop(); latch.countDown(); }}); | |
+ } | |
+ latch.await(5, TimeUnit.SECONDS); | |
+ assertEquals(0, latch.getCount()); | |
+ } | |
+ | |
+ @Test | |
+ public void concurrentJMXCalls() throws InterruptedException, IOException { | |
+ final int nJMXClients = 25; | |
+ final int[] exceptionThrown = {0}; | |
+ ScheduledExecutorService jmxClients = Executors.newScheduledThreadPool(nJMXClients); | |
+ final CountDownLatch latch = new CountDownLatch(400); | |
+ final SelectableChannel dummyChannel = SocketChannel.open().configureBlocking(false); | |
+ final AsynchronousSocket socket= new AsynchronousSocket(dummyChannel); | |
+ final AsyncCallback addIOHandlerTask = new AsyncCallback() { | |
+ public void onCallback() { | |
+ try { | |
+ IOLoop.INSTANCE.addHandler(SocketChannel.open().configureBlocking(false), socket, 0, null); | |
+ dummyChannel.close(); // avoid too many files exception | |
+ } catch (Exception e) { | |
+ System.out.println("1: " + e); | |
+ exceptionThrown[0] = 1; | |
+ } | |
+ }}; | |
+ | |
+ final Runnable jmxTask = new Runnable() { | |
+ | |
+ public void run() { | |
+ try { | |
+ IOLoop.INSTANCE.addCallback(addIOHandlerTask); | |
+ IOLoop.INSTANCE.getNumberOfRegisteredIOHandlers(); | |
+ IOLoop.INSTANCE.getRegisteredIOHandlers(); | |
+ } catch (Exception e) { | |
+ exceptionThrown[0] = 1; | |
+ System.out.println("2: " + e); | |
+ } finally { | |
+ latch.countDown(); | |
+ } | |
+ } | |
+ }; | |
+ | |
+ for (int i = 0; i < nJMXClients; i++) { | |
+ jmxClients.scheduleWithFixedDelay(jmxTask, 0, 10, TimeUnit.NANOSECONDS); | |
} | |
+ | |
latch.await(5, TimeUnit.SECONDS); | |
assertEquals(0, latch.getCount()); | |
+ assertEquals(0, exceptionThrown[0]); | |
+ jmxClients.shutdown(); | |
} | |
public String convertStreamToString(InputStream is) throws IOException { |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment