Skip to content

Instantly share code, notes, and snippets.

@rschildmeijer
Created March 5, 2011 17:58
Show Gist options
  • Save rschildmeijer/856549 to your computer and use it in GitHub Desktop.
Save rschildmeijer/856549 to your computer and use it in GitHub Desktop.
partial #102 (thread safe jmx)
diff --git src/main/java/org/deftserver/io/IOLoop.java src/main/java/org/deftserver/io/IOLoop.java
index 84aa5a3..fa978a0 100644
--- src/main/java/org/deftserver/io/IOLoop.java
+++ src/main/java/org/deftserver/io/IOLoop.java
@@ -9,10 +9,11 @@
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Collection;
-import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
import org.deftserver.io.callback.CallbackManager;
import org.deftserver.io.callback.JMXDebuggableCallbackManager;
@@ -153,13 +154,34 @@
// implements IOLoopMXBean
@Override
public int getNumberOfRegisteredIOHandlers() {
- return handlers.size();
+ final CountDownLatch latch = new CountDownLatch(1);
+ final int[] rv = {0};
+ IOLoop.INSTANCE.addCallback(new AsyncCallback() { public void onCallback() {
+ rv[0] = handlers.size();
+ latch.countDown();
+ }});
+ try {
+ latch.await(5, TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
+ logger.warn("JMX call interrupted while waiting for IOLoop callback.");
+ }
+ return rv[0];
}
@Override
public List<String> getRegisteredIOHandlers() {
- Map<SelectableChannel, IOHandler> defensive = new HashMap<SelectableChannel, IOHandler>(handlers);
- Collection<String> readables = transform(defensive.values(), new Function<IOHandler, String>() {
+ final CountDownLatch latch = new CountDownLatch(1);
+ final Map<SelectableChannel, IOHandler> copy = Maps.newHashMap();
+ IOLoop.INSTANCE.addCallback(new AsyncCallback() { public void onCallback() {
+ copy.putAll(handlers);
+ latch.countDown();
+ }});
+ try {
+ latch.await(5, TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
+ logger.warn("JMX call interrupted while waiting for IOLoop callback.");
+ }
+ Collection<String> readables = transform(copy.values(), new Function<IOHandler, String>() {
@Override public String apply(IOHandler handler) { return handler.toString(); }
});
return Lists.newLinkedList(readables);
diff --git src/test/java/org/deftserver/web/DeftSystemTest.java src/test/java/org/deftserver/web/DeftSystemTest.java
index d43ab3e..17be341 100644
--- src/test/java/org/deftserver/web/DeftSystemTest.java
+++ src/test/java/org/deftserver/web/DeftSystemTest.java
@@ -11,6 +11,7 @@
import java.io.InputStreamReader;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
+import java.nio.channels.SelectableChannel;
import java.nio.channels.SocketChannel;
import java.util.Arrays;
import java.util.HashMap;
@@ -20,6 +21,7 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.http.Header;
@@ -39,6 +41,7 @@
import org.apache.http.params.HttpProtocolParams;
import org.deftserver.example.kv.KeyValueStore;
import org.deftserver.example.kv.KeyValueStoreClient;
+import org.deftserver.io.AsynchronousSocket;
import org.deftserver.io.IOLoop;
import org.deftserver.io.timeout.Timeout;
import org.deftserver.web.handler.RequestHandler;
@@ -1046,9 +1049,54 @@
for (int i = 0; i < n; i++) {
IOLoop.INSTANCE.addCallback(new AsyncCallback() { public void onCallback() { server.listen(PORT+1); }});
IOLoop.INSTANCE.addCallback(new AsyncCallback() { public void onCallback() { server.stop(); latch.countDown(); }});
+ }
+ latch.await(5, TimeUnit.SECONDS);
+ assertEquals(0, latch.getCount());
+ }
+
+ @Test
+ public void concurrentJMXCalls() throws InterruptedException, IOException {
+ final int nJMXClients = 25;
+ final int[] exceptionThrown = {0};
+ ScheduledExecutorService jmxClients = Executors.newScheduledThreadPool(nJMXClients);
+ final CountDownLatch latch = new CountDownLatch(400);
+ final SelectableChannel dummyChannel = SocketChannel.open().configureBlocking(false);
+ final AsynchronousSocket socket= new AsynchronousSocket(dummyChannel);
+ final AsyncCallback addIOHandlerTask = new AsyncCallback() {
+ public void onCallback() {
+ try {
+ IOLoop.INSTANCE.addHandler(SocketChannel.open().configureBlocking(false), socket, 0, null);
+ dummyChannel.close(); // avoid too many files exception
+ } catch (Exception e) {
+ System.out.println("1: " + e);
+ exceptionThrown[0] = 1;
+ }
+ }};
+
+ final Runnable jmxTask = new Runnable() {
+
+ public void run() {
+ try {
+ IOLoop.INSTANCE.addCallback(addIOHandlerTask);
+ IOLoop.INSTANCE.getNumberOfRegisteredIOHandlers();
+ IOLoop.INSTANCE.getRegisteredIOHandlers();
+ } catch (Exception e) {
+ exceptionThrown[0] = 1;
+ System.out.println("2: " + e);
+ } finally {
+ latch.countDown();
+ }
+ }
+ };
+
+ for (int i = 0; i < nJMXClients; i++) {
+ jmxClients.scheduleWithFixedDelay(jmxTask, 0, 10, TimeUnit.NANOSECONDS);
}
+
latch.await(5, TimeUnit.SECONDS);
assertEquals(0, latch.getCount());
+ assertEquals(0, exceptionThrown[0]);
+ jmxClients.shutdown();
}
public String convertStreamToString(InputStream is) throws IOException {
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment