Skip to content

Instantly share code, notes, and snippets.

@Arbow
Created June 2, 2011 09:01
Show Gist options
  • Save Arbow/1004144 to your computer and use it in GitHub Desktop.
Save Arbow/1004144 to your computer and use it in GitHub Desktop.
Netty 3.2.4 idle event trigger patch
diff -r a0c46f1a4e8b src/main/java/org/jboss/netty/channel/socket/DefaultSocketChannelConfig.java
--- a/src/main/java/org/jboss/netty/channel/socket/DefaultSocketChannelConfig.java Thu Jun 02 14:13:48 2011 +0800
+++ b/src/main/java/org/jboss/netty/channel/socket/DefaultSocketChannelConfig.java Thu Jun 02 16:55:06 2011 +0800
@@ -17,6 +17,7 @@
import java.net.Socket;
import java.net.SocketException;
+import java.util.concurrent.TimeUnit;
import org.jboss.netty.channel.ChannelException;
import org.jboss.netty.channel.DefaultChannelConfig;
@@ -66,6 +67,12 @@
setSoLinger(ConversionUtil.toInt(value));
} else if (key.equals("trafficClass")) {
setTrafficClass(ConversionUtil.toInt(value));
+ } else if (key.equals("readerIdleTime")) {
+ setReaderIdleTime(ConversionUtil.toLong(value), TimeUnit.SECONDS);
+ } else if (key.equals("writerIdleTime")) {
+ setWriterIdleTime(ConversionUtil.toLong(value), TimeUnit.SECONDS);
+ } else if (key.equals("allIdleTime")) {
+ setAllIdleTime(ConversionUtil.toLong(value), TimeUnit.SECONDS);
} else {
return false;
}
@@ -192,4 +199,34 @@
throw new ChannelException(e);
}
}
+
+ private long readerIdleTimeMillis;
+
+ private long writerIdleTimeMillis;
+
+ private long allIdleTimeMillis;
+
+ public void setReaderIdleTime(long readerIdleTime, TimeUnit unit) {
+ readerIdleTimeMillis = unit.toMillis(readerIdleTime);
+ }
+
+ public void setWriterIdleTime(long writerIdleTime, TimeUnit unit) {
+ writerIdleTimeMillis = unit.toMillis(writerIdleTime);
+ }
+
+ public void setAllIdleTime(long allIdleTime, TimeUnit unit) {
+ allIdleTimeMillis = unit.toMillis(allIdleTime);
+ }
+
+ public long getReaderIdleTime(TimeUnit unit) {
+ return unit.convert(readerIdleTimeMillis, TimeUnit.MILLISECONDS);
+ }
+
+ public long getWriterIdleTime(TimeUnit unit) {
+ return unit.convert(writerIdleTimeMillis, TimeUnit.MILLISECONDS);
+ }
+
+ public long getAllIdleTime(TimeUnit unit) {
+ return unit.convert(allIdleTimeMillis, TimeUnit.MILLISECONDS);
+ }
}
diff -r a0c46f1a4e8b src/main/java/org/jboss/netty/channel/socket/SocketChannelConfig.java
--- a/src/main/java/org/jboss/netty/channel/socket/SocketChannelConfig.java Thu Jun 02 14:13:48 2011 +0800
+++ b/src/main/java/org/jboss/netty/channel/socket/SocketChannelConfig.java Thu Jun 02 16:55:06 2011 +0800
@@ -16,8 +16,11 @@
package org.jboss.netty.channel.socket;
import java.net.Socket;
+import java.util.concurrent.TimeUnit;
import org.jboss.netty.channel.ChannelConfig;
+import org.jboss.netty.handler.timeout.IdleState;
+import org.jboss.netty.handler.timeout.IdleStateEvent;
/**
* A {@link ChannelConfig} for a {@link SocketChannel}.
@@ -130,4 +133,43 @@
*/
void setPerformancePreferences(
int connectionTime, int latency, int bandwidth);
+
+ /**
+ * An {@link IdleStateEvent} whose state is {@link IdleState#READER_IDLE}
+ * will be triggered when no read was performed for the specified period of
+ * time. Specify {@code 0} to disable.
+ *
+ * @param readerIdleTime
+ * @param unit
+ * the {@link TimeUnit} of {@code readerIdleTime}
+ */
+ void setReaderIdleTime(long readerIdleTime, TimeUnit unit);
+
+ long getReaderIdleTime(TimeUnit unit);
+
+ /**
+ * An {@link IdleStateEvent} whose state is {@link IdleState#WRITER_IDLE}
+ * will be triggered when no write was performed for the specified period of
+ * time. Specify {@code 0} to disable.
+ *
+ * @param writerIdleTime
+ * @param unit
+ * the {@link TimeUnit} {@code writeIdleTime}
+ */
+ void setWriterIdleTime(long writerIdleTime, TimeUnit unit);
+
+ long getWriterIdleTime(TimeUnit unit);
+
+ /**
+ * An {@link IdleStateEvent} whose state is {@link IdleState#ALL_IDLE} will
+ * be triggered when neither read nor write was performed for the specified
+ * period of time. Specify {@code 0} to disable.
+ *
+ * @param allIdleTime
+ * @param unit
+ * the {@link TimeUnit} of {@code allIdleTime}
+ */
+ void setAllIdleTime(long allIdleTime, TimeUnit unit);
+
+ long getAllIdleTime(TimeUnit unit);
}
diff -r a0c46f1a4e8b src/main/java/org/jboss/netty/channel/socket/http/HttpTunnelingSocketChannelConfig.java
--- a/src/main/java/org/jboss/netty/channel/socket/http/HttpTunnelingSocketChannelConfig.java Thu Jun 02 14:13:48 2011 +0800
+++ b/src/main/java/org/jboss/netty/channel/socket/http/HttpTunnelingSocketChannelConfig.java Thu Jun 02 16:55:06 2011 +0800
@@ -17,6 +17,7 @@
import java.util.Map;
import java.util.Map.Entry;
+import java.util.concurrent.TimeUnit;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLEngine;
@@ -321,4 +322,28 @@
public void setPipelineFactory(ChannelPipelineFactory pipelineFactory) {
channel.realChannel.getConfig().setPipelineFactory(pipelineFactory);
}
+
+ public void setReaderIdleTime(long readerIdleTime, TimeUnit unit) {
+ throw new UnsupportedOperationException();
+ }
+
+ public long getReaderIdleTime(TimeUnit unit) {
+ throw new UnsupportedOperationException();
+ }
+
+ public void setWriterIdleTime(long writerIdleTime, TimeUnit unit) {
+ throw new UnsupportedOperationException();
+ }
+
+ public long getWriterIdleTime(TimeUnit unit) {
+ throw new UnsupportedOperationException();
+ }
+
+ public void setAllIdleTime(long allIdleTime, TimeUnit unit) {
+ throw new UnsupportedOperationException();
+ }
+
+ public long getAllIdleTime(TimeUnit unit) {
+ throw new UnsupportedOperationException();
+ }
}
diff -r a0c46f1a4e8b src/main/java/org/jboss/netty/channel/socket/nio/NioSocketChannel.java
--- a/src/main/java/org/jboss/netty/channel/socket/nio/NioSocketChannel.java Thu Jun 02 14:13:48 2011 +0800
+++ b/src/main/java/org/jboss/netty/channel/socket/nio/NioSocketChannel.java Thu Jun 02 16:55:06 2011 +0800
@@ -34,6 +34,7 @@
import org.jboss.netty.channel.ChannelSink;
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.channel.socket.nio.SocketSendBufferPool.SendBuffer;
+import org.jboss.netty.util.Timeout;
import org.jboss.netty.util.internal.LinkedTransferQueue;
import org.jboss.netty.util.internal.ThreadLocalBoolean;
@@ -74,6 +75,15 @@
MessageEvent currentWriteEvent;
SendBuffer currentWriteBuffer;
+ volatile long lastWriteOperationTime;
+ long lastReadOperationTime;
+ long lastReaderIdleEventTime; // last READER_IDLE reported time
+ long lastWriterIdleEventTime; // last WRITER_IDLE reported time
+ long lastAllIdleEventTime; // last ALL_IDLE reported time
+ Timeout readIdleCheckTimeout;
+ Timeout writeIdleCheckTimeout;
+ Timeout allIdleCheckTimeout;
+
public NioSocketChannel(
Channel parent, ChannelFactory factory,
ChannelPipeline pipeline, ChannelSink sink,
diff -r a0c46f1a4e8b src/main/java/org/jboss/netty/channel/socket/nio/NioWorker.java
--- a/src/main/java/org/jboss/netty/channel/socket/nio/NioWorker.java Thu Jun 02 14:13:48 2011 +0800
+++ b/src/main/java/org/jboss/netty/channel/socket/nio/NioWorker.java Thu Jun 02 16:55:06 2011 +0800
@@ -32,6 +32,7 @@
import java.util.Set;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -44,9 +45,14 @@
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.channel.ReceiveBufferSizePredictor;
import org.jboss.netty.channel.socket.nio.SocketSendBufferPool.SendBuffer;
+import org.jboss.netty.handler.timeout.DefaultIdleStateEvent;
+import org.jboss.netty.handler.timeout.IdleState;
import org.jboss.netty.logging.InternalLogger;
import org.jboss.netty.logging.InternalLoggerFactory;
+import org.jboss.netty.util.HashedWheelTimer;
import org.jboss.netty.util.ThreadRenamingRunnable;
+import org.jboss.netty.util.Timeout;
+import org.jboss.netty.util.TimerTask;
import org.jboss.netty.util.internal.DeadLockProofWorker;
import org.jboss.netty.util.internal.LinkedTransferQueue;
@@ -83,10 +89,16 @@
private final SocketReceiveBufferPool recvBufferPool = new SocketReceiveBufferPool();
private final SocketSendBufferPool sendBufferPool = new SocketSendBufferPool();
+ private final HashedWheelTimer idleCheckTimer;
+ private final Queue<NioSocketChannel> idleChannels;
+
NioWorker(int bossId, int id, Executor executor) {
this.bossId = bossId;
this.id = id;
this.executor = executor;
+ this.idleCheckTimer = new HashedWheelTimer(100, TimeUnit.MILLISECONDS);
+ this.idleCheckTimer.start();
+ this.idleChannels = new LinkedTransferQueue<NioSocketChannel>();
}
void register(NioSocketChannel channel, ChannelFuture future) {
@@ -137,6 +149,8 @@
started = true;
boolean offered = registerTaskQueue.offer(registerTask);
assert offered;
+
+ registerIdleCheckTasks(channel);
}
if (wakenUp.compareAndSet(false, true)) {
@@ -144,6 +158,135 @@
}
}
+ private void registerIdleCheckTasks(final NioSocketChannel channel) {
+ long readerIdleTime = channel.getConfig().getReaderIdleTime(TimeUnit.MILLISECONDS);
+ long writerIdleTime = channel.getConfig().getWriterIdleTime(TimeUnit.MILLISECONDS);
+ long allIdleTime = channel.getConfig().getAllIdleTime(TimeUnit.MILLISECONDS);
+ TimerTask idleCheckTask = new TimerTask() {
+ public void run(Timeout timeout) throws Exception {
+ idleChannels.offer(channel);
+ }
+ };
+ if (readerIdleTime > 0) {
+ channel.readIdleCheckTimeout = idleCheckTimer.newTimeout(idleCheckTask, readerIdleTime, TimeUnit.MILLISECONDS);
+ }
+ if (writerIdleTime > 0) {
+ channel.writeIdleCheckTimeout = idleCheckTimer.newTimeout(idleCheckTask, writerIdleTime, TimeUnit.MILLISECONDS);
+ }
+ if (allIdleTime > 0) {
+ channel.allIdleCheckTimeout = idleCheckTimer.newTimeout(idleCheckTask, allIdleTime, TimeUnit.MILLISECONDS);
+ }
+ }
+
+ private void updateChannelReadIdleCheck(final NioSocketChannel channel) {
+ channel.lastReadOperationTime = System.currentTimeMillis();
+ if (channel.readIdleCheckTimeout != null) {
+ channel.readIdleCheckTimeout.cancel();
+ long readerIdleTime = channel.getConfig().getReaderIdleTime(TimeUnit.MILLISECONDS);
+ TimerTask idleCheckTask = new TimerTask() {
+ public void run(Timeout timeout) throws Exception {
+ idleChannels.offer(channel);
+ }
+ };
+ channel.readIdleCheckTimeout = idleCheckTimer.newTimeout(idleCheckTask, readerIdleTime, TimeUnit.MILLISECONDS);
+ }
+ updateChannelAllIdleCheck(channel);
+ }
+
+ private void updateChannelWriteIdleCheck(final NioSocketChannel channel) {
+ channel.lastWriteOperationTime = System.currentTimeMillis();
+ if (channel.writeIdleCheckTimeout != null) {
+ channel.writeIdleCheckTimeout.cancel();
+ long writerIdleTime = channel.getConfig().getWriterIdleTime(TimeUnit.MILLISECONDS);
+ TimerTask idleCheckTask = new TimerTask() {
+ public void run(Timeout timeout) throws Exception {
+ idleChannels.offer(channel);
+ }
+ };
+ channel.writeIdleCheckTimeout = idleCheckTimer.newTimeout(idleCheckTask, writerIdleTime, TimeUnit.MILLISECONDS);
+ }
+ updateChannelAllIdleCheck(channel);
+ }
+
+ private void updateChannelAllIdleCheck(final NioSocketChannel channel) {
+ if (channel.allIdleCheckTimeout != null) {
+ channel.allIdleCheckTimeout.cancel();
+ long allIdleTime = channel.getConfig().getAllIdleTime(TimeUnit.MILLISECONDS);
+ TimerTask idleCheckTask = new TimerTask() {
+ public void run(Timeout timeout) throws Exception {
+ idleChannels.offer(channel);
+ }
+ };
+ channel.allIdleCheckTimeout = idleCheckTimer.newTimeout(idleCheckTask, allIdleTime, TimeUnit.MILLISECONDS);
+ }
+ }
+
+ private void fireChannelIdle(Channel channel, IdleState state, long lastActivityTimeMillis) {
+ channel.getPipeline().sendUpstream(new DefaultIdleStateEvent(channel, state, lastActivityTimeMillis));
+ }
+
+ private boolean checkIdleState(Channel channel, IdleState idleState, long idleTimeout, long currentTime,
+ long lastIoTime) {
+ if (idleTimeout > 0) {
+ long delta = currentTime - lastIoTime;
+
+ if (delta >= idleTimeout) {
+ return true;
+ }
+ }
+
+ return false;
+ }
+
+ private void processIdleChannels() {
+ if (!idleChannels.isEmpty()) {
+ for (final NioSocketChannel channel:idleChannels) {
+ long currentTime = System.currentTimeMillis();
+ if (!channel.isConnected()) {
+ idleChannels.remove(channel);
+ continue;
+ }
+
+ //check if really idle again
+ long readerIdleTime = channel.getConfig().getReaderIdleTime(TimeUnit.MILLISECONDS);
+ long writerIdleTime = channel.getConfig().getWriterIdleTime(TimeUnit.MILLISECONDS);
+ long allIdleTime = channel.getConfig().getAllIdleTime(TimeUnit.MILLISECONDS);
+ long maxReadIdleTime = Math.max(readerIdleTime, allIdleTime);
+ long maxWriteIdleTime = Math.max(writerIdleTime, allIdleTime);
+ final TimerTask idleCheckTask = new TimerTask() {
+ public void run(Timeout timeout) throws Exception {
+ idleChannels.offer(channel);
+ }
+ };
+
+ if (checkIdleState(channel, IdleState.READER_IDLE, readerIdleTime, currentTime,
+ Math.max(channel.lastReadOperationTime, channel.lastReaderIdleEventTime))) {
+ channel.lastReaderIdleEventTime = currentTime;
+ channel.readIdleCheckTimeout = idleCheckTimer.newTimeout(idleCheckTask, maxReadIdleTime, TimeUnit.MILLISECONDS);
+ fireChannelIdle(channel, IdleState.READER_IDLE, channel.lastReadOperationTime);
+ }
+
+ if (checkIdleState(channel, IdleState.WRITER_IDLE, writerIdleTime, currentTime,
+ Math.max(channel.lastWriteOperationTime, channel.lastWriterIdleEventTime))) {
+ channel.lastWriterIdleEventTime = currentTime;
+ channel.writeIdleCheckTimeout = idleCheckTimer.newTimeout(idleCheckTask, maxWriteIdleTime, TimeUnit.MILLISECONDS);
+ fireChannelIdle(channel, IdleState.WRITER_IDLE, channel.lastWriteOperationTime);
+ }
+
+ long lastIoTime = Math.max(channel.lastReadOperationTime, channel.lastWriteOperationTime);
+ if (checkIdleState(channel, IdleState.ALL_IDLE, allIdleTime, currentTime,
+ Math.max(lastIoTime, channel.lastAllIdleEventTime))) {
+ channel.lastAllIdleEventTime = currentTime;
+ channel.readIdleCheckTimeout = idleCheckTimer.newTimeout(idleCheckTask, maxReadIdleTime, TimeUnit.MILLISECONDS);
+ channel.writeIdleCheckTimeout = idleCheckTimer.newTimeout(idleCheckTask, maxWriteIdleTime, TimeUnit.MILLISECONDS);
+ fireChannelIdle(channel, IdleState.ALL_IDLE, lastIoTime);
+ }
+
+ idleChannels.remove(channel);
+ }
+ }
+ }
+
public void run() {
thread = Thread.currentThread();
@@ -198,13 +341,17 @@
processRegisterTaskQueue();
processWriteTaskQueue();
processSelectedKeys(selector.selectedKeys());
+
+ Set<SelectionKey> keys = selector.keys();
+
+ processIdleChannels();
// Exit the loop when there's nothing to handle.
// The shutdown flag is used to delay the shutdown of this
// loop to avoid excessive Selector creation when
// connections are registered in a one-by-one manner instead of
// concurrent manner.
- if (selector.keys().isEmpty()) {
+ if (keys.isEmpty()) {
if (shutdown ||
executor instanceof ExecutorService && ((ExecutorService) executor).isShutdown()) {
@@ -244,6 +391,7 @@
}
}
}
+ this.idleCheckTimer.stop();
}
private void processRegisterTaskQueue() throws IOException {
@@ -347,6 +495,7 @@
// Fire the event.
fireMessageReceived(channel, buffer);
+ updateChannelReadIdleCheck(channel);
} else {
recvBufferPool.release(bb);
}
@@ -475,6 +624,8 @@
break;
}
}
+
+ updateChannelWriteIdleCheck(channel);
if (buf.finished()) {
// Successful write - proceed to the next message.
@@ -776,6 +927,10 @@
"Failed to register a socket to the selector.", e);
}
}
+
+ // TODO where should it be placed?
+ channel.lastReadOperationTime = System.currentTimeMillis();
+ channel.lastWriteOperationTime = System.currentTimeMillis();
if (!server) {
if (!((NioClientSocketChannel) channel).boundManually) {
diff -r a0c46f1a4e8b src/main/java/org/jboss/netty/util/internal/ConversionUtil.java
--- a/src/main/java/org/jboss/netty/util/internal/ConversionUtil.java Thu Jun 02 14:13:48 2011 +0800
+++ b/src/main/java/org/jboss/netty/util/internal/ConversionUtil.java Thu Jun 02 16:55:06 2011 +0800
@@ -42,6 +42,17 @@
}
/**
+ * Converts the specified object into an long.
+ */
+ public static long toLong(Object value) {
+ if (value instanceof Number) {
+ return ((Number) value).longValue();
+ } else {
+ return Long.parseLong(String.valueOf(value));
+ }
+ }
+
+ /**
* Converts the specified object into a boolean.
*/
public static boolean toBoolean(Object value) {
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment