Skip to content

Instantly share code, notes, and snippets.

@Pandragon
Created July 31, 2014 12:56
Show Gist options
  • Save Pandragon/1b19a2684c6cb8d87cee to your computer and use it in GitHub Desktop.
Save Pandragon/1b19a2684c6cb8d87cee to your computer and use it in GitHub Desktop.
### Eclipse Workspace Patch 1.0
#P L2J_Server_BETA
Index: .classpath
===================================================================
--- .classpath (revision 6599)
+++ .classpath (working copy)
@@ -1,16 +1,16 @@
<?xml version="1.0" encoding="UTF-8"?>
<classpath>
- <classpathentry kind="src" path="java" />
- <classpathentry kind="con" path="org.eclipse.jdt.launching.JRE_CONTAINER/org.eclipse.jdt.internal.debug.ui.launcher.StandardVMType/JavaSE-1.8" />
- <classpathentry kind="lib" path="dist/libs/c3p0-0.9.5-pre6.jar" />
- <classpathentry kind="lib" path="dist/libs/javolution-5.5.1.jar" sourcepath="dist/libs/javolution-5.5.1-src.zip" />
- <classpathentry kind="lib" path="dist/libs/jython.jar" />
- <classpathentry kind="lib" path="dist/libs/jython-engine-2.2.1.jar" />
- <classpathentry kind="lib" path="dist/libs/mail-1.5.0.jar" />
- <classpathentry kind="lib" path="dist/libs/mmocore.jar" />
- <classpathentry kind="lib" path="dist/libs/netcon-1.7.jar" />
- <classpathentry kind="lib" path="dist/libs/Subnet-1.0.jar" />
- <classpathentry kind="lib" path="dist/libs/L2J_GeoAbstraction.jar" />
- <classpathentry kind="lib" path="dist/libs/weupnp-0.1.3.jar" />
- <classpathentry kind="output" path="bin" />
+ <classpathentry kind="src" path="java"/>
+ <classpathentry kind="con" path="org.eclipse.jdt.launching.JRE_CONTAINER/org.eclipse.jdt.internal.debug.ui.launcher.StandardVMType/JavaSE-1.8"/>
+ <classpathentry kind="lib" path="dist/libs/c3p0-0.9.5-pre6.jar"/>
+ <classpathentry kind="lib" path="dist/libs/javolution-5.5.1.jar" sourcepath="dist/libs/javolution-5.5.1-src.zip"/>
+ <classpathentry kind="lib" path="dist/libs/jython.jar"/>
+ <classpathentry kind="lib" path="dist/libs/jython-engine-2.2.1.jar"/>
+ <classpathentry kind="lib" path="dist/libs/mail-1.5.0.jar"/>
+ <classpathentry kind="lib" path="dist/libs/netcon-1.7.jar"/>
+ <classpathentry kind="lib" path="dist/libs/Subnet-1.0.jar"/>
+ <classpathentry kind="lib" path="dist/libs/L2J_GeoAbstraction.jar"/>
+ <classpathentry kind="lib" path="dist/libs/weupnp-0.1.3.jar"/>
+ <classpathentry kind="lib" path="dist/libs/asyncmmocore.jar"/>
+ <classpathentry kind="output" path="bin"/>
</classpath>
Index: java/com/l2jserver/gameserver/GameServer.java
===================================================================
--- java/com/l2jserver/gameserver/GameServer.java (revision 6599)
+++ java/com/l2jserver/gameserver/GameServer.java (working copy)
@@ -30,8 +30,8 @@
import java.util.logging.LogManager;
import java.util.logging.Logger;
-import org.mmocore.network.SelectorConfig;
-import org.mmocore.network.SelectorThread;
+import org.mmocore.network.Core;
+import org.mmocore.network.CoreConfig;
import com.l2jserver.Config;
import com.l2jserver.L2DatabaseFactory;
@@ -148,7 +148,7 @@
{
private static final Logger _log = Logger.getLogger(GameServer.class.getName());
- private final SelectorThread<L2GameClient> _selectorThread;
+ private final Core<L2GameClient> _networkCore;
private final L2GamePacketHandler _gamePacketHandler;
private final DeadLockDetector _deadDetectThread;
private final IdFactory _idFactory;
@@ -162,9 +162,9 @@
return (Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory()) / 1048576; // ;
}
- public SelectorThread<L2GameClient> getSelectorThread()
+ public Core<L2GameClient> getNetworkCore()
{
- return _selectorThread;
+ return _networkCore;
}
public L2GamePacketHandler getL2GamePacketHandler()
@@ -426,15 +426,14 @@
CommunityServerThread.initialize();
- final SelectorConfig sc = new SelectorConfig();
+ final CoreConfig sc = new CoreConfig();
sc.MAX_READ_PER_PASS = Config.MMO_MAX_READ_PER_PASS;
sc.MAX_SEND_PER_PASS = Config.MMO_MAX_SEND_PER_PASS;
sc.SLEEP_TIME = Config.MMO_SELECTOR_SLEEP_TIME;
sc.HELPER_BUFFER_COUNT = Config.MMO_HELPER_BUFFER_COUNT;
- sc.TCP_NODELAY = Config.MMO_TCP_NODELAY;
_gamePacketHandler = new L2GamePacketHandler();
- _selectorThread = new SelectorThread<>(sc, _gamePacketHandler, _gamePacketHandler, _gamePacketHandler, new IPv4Filter());
+ _networkCore = new Core<>(sc, _gamePacketHandler, _gamePacketHandler, _gamePacketHandler, new IPv4Filter());
InetAddress bindAddress = null;
if (!Config.GAMESERVER_HOSTNAME.equals("*"))
@@ -451,8 +450,7 @@
try
{
- _selectorThread.openServerSocket(bindAddress, Config.PORT_GAME);
- _selectorThread.start();
+ _networkCore.openServerSocket(bindAddress, Config.PORT_GAME);
_log.log(Level.INFO, getClass().getSimpleName() + ": is now listening on: " + Config.GAMESERVER_HOSTNAME + ":" + Config.PORT_GAME);
}
catch (IOException e)
Index: java/com/l2jserver/loginserver/L2LoginServer.java
===================================================================
--- java/com/l2jserver/loginserver/L2LoginServer.java (revision 6599)
+++ java/com/l2jserver/loginserver/L2LoginServer.java (working copy)
@@ -32,8 +32,8 @@
import java.util.logging.LogManager;
import java.util.logging.Logger;
-import org.mmocore.network.SelectorConfig;
-import org.mmocore.network.SelectorThread;
+import org.mmocore.network.Core;
+import org.mmocore.network.CoreConfig;
import com.l2jserver.Config;
import com.l2jserver.L2DatabaseFactory;
@@ -54,7 +54,7 @@
public static final int PROTOCOL_REV = 0x0106;
private static L2LoginServer _instance;
private GameServerListener _gameServerListener;
- private SelectorThread<L2LoginClient> _selectorThread;
+ private Core<L2LoginClient> _networkCore;
private Status _statusServer;
private Thread _restartLoginServer;
@@ -138,19 +138,23 @@
}
}
- final SelectorConfig sc = new SelectorConfig();
+ final CoreConfig sc = new CoreConfig();
sc.MAX_READ_PER_PASS = Config.MMO_MAX_READ_PER_PASS;
sc.MAX_SEND_PER_PASS = Config.MMO_MAX_SEND_PER_PASS;
sc.SLEEP_TIME = Config.MMO_SELECTOR_SLEEP_TIME;
sc.HELPER_BUFFER_COUNT = Config.MMO_HELPER_BUFFER_COUNT;
+ sc.ASYNC_THREAD_POOL_SIZE = 2;
+ sc.WORKERS_THREAD_POOL_SIZE = 2;
+ sc.NATIVE_BUF_POOL_SIZE = 2;
+ sc.STRING_BUF_POOL_SIZE = 2;
final L2LoginPacketHandler lph = new L2LoginPacketHandler();
final SelectorHelper sh = new SelectorHelper();
try
{
- _selectorThread = new SelectorThread<>(sc, sh, lph, sh, sh);
+ _networkCore = new Core<>(sc, sh, lph, sh, sh);
}
- catch (IOException e)
+ catch (Exception e)
{
_log.log(Level.SEVERE, "FATAL: Failed to open Selector. Reason: " + e.getMessage(), e);
System.exit(1);
@@ -187,8 +191,7 @@
try
{
- _selectorThread.openServerSocket(bindAddress, Config.PORT_LOGIN);
- _selectorThread.start();
+ _networkCore.openServerSocket(bindAddress, Config.PORT_LOGIN);
_log.log(Level.INFO, getClass().getSimpleName() + ": is now listening on: " + Config.LOGIN_BIND_ADDRESS + ":" + Config.PORT_LOGIN);
}
catch (IOException e)
Index: java/com/l2jserver/gameserver/Shutdown.java
===================================================================
--- java/com/l2jserver/gameserver/Shutdown.java (revision 6599)
+++ java/com/l2jserver/gameserver/Shutdown.java (working copy)
@@ -272,7 +272,7 @@
// saveData sends messages to exit players, so shutdown selector after it
try
{
- GameServer.gameServer.getSelectorThread().shutdown();
+ GameServer.gameServer.getNetworkCore().shutdown();
_log.info("Game Server: Selector thread has been shut down(" + tc.getEstimatedTimeAndRestartCounter() + "ms).");
}
catch (Throwable t)
Index: java/com/l2jserver/loginserver/SelectorHelper.java
===================================================================
--- java/com/l2jserver/loginserver/SelectorHelper.java (revision 6599)
+++ java/com/l2jserver/loginserver/SelectorHelper.java (working copy)
@@ -18,7 +18,8 @@
*/
package com.l2jserver.loginserver;
-import java.nio.channels.SocketChannel;
+import java.net.InetSocketAddress;
+import java.nio.channels.AsynchronousSocketChannel;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
@@ -62,8 +63,17 @@
}
@Override
- public boolean accept(SocketChannel sc)
+ public boolean accept(AsynchronousSocketChannel sc)
{
- return _ipv4filter.accept(sc) && !LoginController.getInstance().isBannedAddress(sc.socket().getInetAddress());
+ try
+ {
+ InetSocketAddress isaddr = (InetSocketAddress) sc.getRemoteAddress();
+ return _ipv4filter.accept(sc) && !LoginController.getInstance().isBannedAddress(isaddr.getAddress());
+ }
+ catch (Exception e)
+ {
+
+ }
+ return false;
}
}
Index: java/com/l2jserver/util/IPv4Filter.java
===================================================================
--- java/com/l2jserver/util/IPv4Filter.java (revision 6599)
+++ java/com/l2jserver/util/IPv4Filter.java (working copy)
@@ -19,7 +19,8 @@
package com.l2jserver.util;
import java.net.InetAddress;
-import java.nio.channels.SocketChannel;
+import java.net.InetSocketAddress;
+import java.nio.channels.AsynchronousSocketChannel;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map.Entry;
@@ -65,9 +66,19 @@
}
@Override
- public boolean accept(SocketChannel sc)
+ public boolean accept(AsynchronousSocketChannel sc)
{
- InetAddress addr = sc.socket().getInetAddress();
+ InetAddress addr = null;
+ try
+ {
+ InetSocketAddress isaddr = (InetSocketAddress) sc.getRemoteAddress();
+ addr = isaddr.getAddress();
+ }
+ catch (Exception e)
+ {
+ return false;
+ }
+
int h = hash(addr.getAddress());
long current = System.currentTimeMillis();
#P L2J_DataPack_BETA
Index: .classpath
===================================================================
--- .classpath (revision 10393)
+++ .classpath (working copy)
@@ -1,11 +1,11 @@
<?xml version="1.0" encoding="UTF-8"?>
<classpath>
- <classpathentry including="**/*.java" kind="src" path="dist/game/data/scripts" />
- <classpathentry kind="con" path="org.eclipse.jdt.launching.JRE_CONTAINER/org.eclipse.jdt.internal.debug.ui.launcher.StandardVMType/JavaSE-1.8" />
- <classpathentry combineaccessrules="false" kind="src" path="/L2J_Server_BETA" />
- <classpathentry kind="lib" path="/L2J_Server_BETA/dist/libs/javolution-5.5.1.jar" />
- <classpathentry kind="lib" path="/L2J_Server_BETA/dist/libs/mmocore.jar" />
- <classpathentry kind="lib" path="/L2J_Server_BETA/dist/libs/netcon-1.7.jar" />
- <classpathentry kind="lib" path="/L2J_Server_BETA/dist/libs/L2J_GeoAbstraction.jar" />
- <classpathentry kind="output" path="bin" />
+ <classpathentry including="**/*.java" kind="src" path="dist/game/data/scripts"/>
+ <classpathentry kind="con" path="org.eclipse.jdt.launching.JRE_CONTAINER/org.eclipse.jdt.internal.debug.ui.launcher.StandardVMType/JavaSE-1.8"/>
+ <classpathentry combineaccessrules="false" kind="src" path="/L2J_Server_BETA"/>
+ <classpathentry kind="lib" path="/L2J_Server_BETA/dist/libs/javolution-5.5.1.jar"/>
+ <classpathentry kind="lib" path="/L2J_Server_BETA/dist/libs/netcon-1.7.jar"/>
+ <classpathentry kind="lib" path="/L2J_Server_BETA/dist/libs/L2J_GeoAbstraction.jar"/>
+ <classpathentry kind="lib" path="/L2J_Server_BETA/dist/libs/asyncmmocore.jar"/>
+ <classpathentry kind="output" path="bin"/>
</classpath>
### Eclipse Workspace Patch 1.0
#P MMOCore
Index: src/org/mmocore/network/WriteHandler.java
===================================================================
--- src/org/mmocore/network/WriteHandler.java (revision 0)
+++ src/org/mmocore/network/WriteHandler.java (working copy)
@@ -0,0 +1,45 @@
+package org.mmocore.network;
+
+/**
+ * @author BiggBoss
+ * @param <T>
+ */
+public final class WriteHandler<T extends MMOClient<?>> extends AbstractWriteHandler<T>
+{
+ WriteHandler(final Core<T> corePtr)
+ {
+ super(corePtr);
+ }
+
+ @Override
+ public void completed(Integer result, MMOConnection<T> con)
+ {
+ // check if no error happened
+ if (result >= 0)
+ {
+ // check if we written everything
+ if (result != con.getLastWritePassSize())
+ {
+ // incomplete write
+ con.createWriteBuffer(con.getTempWriteBuffer());
+ }
+
+ core.getBufferPool().recycleNativeBuffer(con.getTempWriteBuffer());
+ con.setTempWriteBuffer(null);
+ con.setPendingWritting(false);
+ con.executeWriteTask(this);
+ }
+ else
+ {
+ con.getClient().onForcedDisconnection();
+ core.closeConnectionImpl(con);
+ }
+ }
+
+ @Override
+ public void failed(Throwable t, MMOConnection<T> con)
+ {
+ con.getClient().onForcedDisconnection();
+ core.closeConnectionImpl(con);
+ }
+}
Index: lib/javolution_LICENSE.txt
===================================================================
--- lib/javolution_LICENSE.txt (revision 6601)
+++ lib/javolution_LICENSE.txt (working copy)
@@ -1,27 +0,0 @@
-/*
- * Javolution - Java(tm) Solution for Real-Time and Embedded Systems
- * Copyright (c) 2005-2009, Javolution (http://javolution.org/)
- * All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are met:
- *
- * 1. Redistributions of source code must retain the above copyright
- * notice, this list of conditions and the following disclaimer.
- *
- * 2. Redistributions in binary form must reproduce the above copyright
- * notice, this list of conditions and the following disclaimer in the
- * documentation and/or other materials provided with the distribution.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
- * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
- * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
- * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
- * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
- * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
- * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
- * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
- * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
- * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
- * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
- */
\ No newline at end of file
Index: lib/javolution-5.5.1.jar
===================================================================
Cannot display: file marked as a binary type.
svn:mime-type = application/octet-stream
Index: src/org/mmocore/network/ReadHandler.java
===================================================================
--- src/org/mmocore/network/ReadHandler.java (revision 0)
+++ src/org/mmocore/network/ReadHandler.java (working copy)
@@ -0,0 +1,33 @@
+package org.mmocore.network;
+
+import java.nio.channels.CompletionHandler;
+
+/**
+ * @author BiggBoss
+ * @param <T>
+ */
+public final class ReadHandler<T extends MMOClient<?>> implements CompletionHandler<Integer, MMOConnection<T>>
+{
+ private Core<T> core;
+
+ ReadHandler(Core<T> corePtr)
+ {
+ core = corePtr;
+ }
+
+ @Override
+ public void completed(Integer read, MMOConnection<T> con)
+ {
+ core.executeReadTask(new ReadParseTask<>(core, con, read));
+ }
+
+ @Override
+ public void failed(Throwable t, MMOConnection<T> con)
+ {
+ // Second part of read result = -2
+ con.getClient().onForcedDisconnection();
+ core.closeConnectionImpl(con);
+ }
+
+
+}
Index: src/org/mmocore/network/AbstractWriteHandler.java
===================================================================
--- src/org/mmocore/network/AbstractWriteHandler.java (revision 0)
+++ src/org/mmocore/network/AbstractWriteHandler.java (working copy)
@@ -0,0 +1,31 @@
+/*
+ * This program is free software: you can redistribute it and/or modify it under
+ * the terms of the GNU General Public License as published by the Free Software
+ * Foundation, either version 3 of the License, or (at your option) any later
+ * version.
+ *
+ * This program is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+ * FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
+ * details.
+ *
+ * You should have received a copy of the GNU General Public License along with
+ * this program. If not, see <http://www.gnu.org/licenses/>.
+ */
+package org.mmocore.network;
+
+import java.nio.channels.CompletionHandler;
+
+/**
+ * @author BiggBoss
+ * @param <T>
+ */
+public abstract class AbstractWriteHandler<T extends MMOClient<?>> implements CompletionHandler<Integer, MMOConnection<T>>
+{
+ protected final Core<T> core;
+
+ AbstractWriteHandler(final Core<T> coreRef)
+ {
+ core = coreRef;
+ }
+}
Index: src/org/mmocore/network/CoreConfig.java
===================================================================
--- src/org/mmocore/network/CoreConfig.java (revision 0)
+++ src/org/mmocore/network/CoreConfig.java (working copy)
@@ -0,0 +1,71 @@
+/* This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2, or (at your option)
+ * any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA
+ * 02111-1307, USA.
+ *
+ * http://www.gnu.org/copyleft/gpl.html
+ */
+package org.mmocore.network;
+
+/**
+ * @author KenM
+ *
+ */
+public final class CoreConfig
+{
+ public int READ_BUFFER_SIZE = 64 * 1024;
+
+ public int WRITE_BUFFER_SIZE = 64 * 1024;
+
+ public int HELPER_BUFFER_COUNT = 20;
+
+ public int HELPER_BUFFER_SIZE = 64 * 1024;
+
+ /**
+ * Server will try to send MAX_SEND_PER_PASS packets per socket write call<br>
+ * however it may send less if the write buffer was filled before achieving
+ * this value.
+ */
+ public int MAX_SEND_PER_PASS = 10;
+
+ /**
+ * Server will try to read MAX_READ_PER_PASS packets per socket read call<br>
+ * however it may read less if the read buffer was empty before achieving
+ * this value.
+ */
+ public int MAX_READ_PER_PASS = 10;
+
+ /**
+ * Defines how much time (in milis) should the selector sleep, an higher
+ * value increases throughput but also increases latency(to a max of the
+ * sleep value itself).<BR>
+ * Also an extremely high value(usually > 100) will decrease throughput due
+ * to the server not doing enough sends per second (depends on max sends per
+ * pass).<BR>
+ * <BR>
+ * Recommended values:<BR>
+ * 1 for minimal latency.<BR>
+ * 10-30 for an latency/troughput trade-off based on your needs.<BR>
+ */
+ public int SLEEP_TIME = 10;
+
+ /**
+ * Asynchronous settings
+ */
+ public int ASYNC_THREAD_POOL_SIZE = 4;
+ public int WORKERS_THREAD_POOL_SIZE = 4;
+ public int NATIVE_BUF_POOL_SIZE = 4;
+ public int NATIVE_BUF_SIZE = 64 * 1024;
+ public int STRING_BUF_POOL_SIZE = 4;
+ public int STRING_BUF_SIZE = 64 * 1024;
+}
Index: src/org/mmocore/network/ReadParseTask.java
===================================================================
--- src/org/mmocore/network/ReadParseTask.java (revision 0)
+++ src/org/mmocore/network/ReadParseTask.java (working copy)
@@ -0,0 +1,168 @@
+package org.mmocore.network;
+
+import java.nio.ByteBuffer;
+
+/**
+ * @authors KenM, BiggBoss
+ * @param <T>
+ */
+public final class ReadParseTask<T extends MMOClient<?>> implements Runnable
+{
+ public static int MAX_READ_PER_PASS;
+ public static int HEADER_SIZE;
+
+ private final Core<T> core;
+ private final BufferPool pool;
+
+ private final MMOConnection<T> con;
+ private final int readBytes;
+
+ ReadParseTask(final Core<T> coreRef, final MMOConnection<T> conn, final int read)
+ {
+ core = coreRef;
+ con = conn;
+ pool = core.getBufferPool();
+ readBytes = read;
+ }
+
+ @Override
+ public void run()
+ {
+ readPackets(readBytes, con);
+
+ try
+ {
+ con.startReadTask(core.getReadCompletionHandler());
+ }
+ catch(Exception e)
+ {
+ con.getClient().onForcedDisconnection();
+ core.closeConnectionImpl(con);
+ }
+ }
+
+ private void readPackets(int result, MMOConnection<T> con)
+ {
+ if (!con.isClosed())
+ {
+ ByteBuffer buf = con.getReadBuffer();
+
+ // if we try to to do a read with no space in the buffer it will
+ // read 0 bytes
+ // going into infinite loop
+ if (buf.position() == buf.limit())
+ {
+ System.exit(0);
+ }
+
+ if (result > 0)
+ {
+ buf.flip();
+
+ final T client = con.getClient();
+
+ for (int i = 0; i < MAX_READ_PER_PASS; i++)
+ {
+ if (!tryReadPacket(client, buf, con))
+ {
+ return;
+ }
+ }
+
+ // only reachable if MAX_READ_PER_PASS has been reached
+ // check if there are some more bytes in buffer
+ // and allocate/compact to prevent content lose.
+ if (buf.remaining() > 0)
+ {
+ // move the first byte to the beginning :)
+ buf.compact();
+ }
+ else
+ {
+ pool.recycleBuffer(buf);
+ con.setReadBuffer(null);
+ }
+ }
+ else
+ {
+ core.closeConnectionImpl(con);
+ }
+ }
+ }
+
+ private boolean tryReadPacket(T client, ByteBuffer buf, MMOConnection<T> con)
+ {
+ switch (buf.remaining())
+ {
+ case 0:
+ // buffer is full
+ // nothing to read
+ return false;
+ case 1:
+ // we don`t have enough data for header so we need to read
+ buf.compact();
+ return false;
+ default:
+ // data size excluding header size :>
+ final int dataPending = (buf.getShort() & 0xFFFF) - HEADER_SIZE;
+
+ // do we got enough bytes for the packet?
+ if (dataPending <= buf.remaining())
+ {
+ // avoid parsing dummy packets (packets without body)
+ if (dataPending > 0)
+ {
+ final int pos = buf.position();
+ parseClientPacket(pos, buf, dataPending, client);
+ buf.position(pos + dataPending);
+ }
+
+ // if we are done with this buffer
+ if (!buf.hasRemaining())
+ {
+ con.setReadBuffer(null);
+ pool.recycleBuffer(buf);
+ return false;
+ }
+ return true;
+ }
+
+ // we don`t have enough bytes for the dataPacket so we need
+ // to read
+ buf.position(buf.position() - HEADER_SIZE);
+ buf.compact();
+ return false;
+ }
+ }
+
+ private void parseClientPacket(int pos, ByteBuffer buf, int dataSize, T client)
+ {
+ final boolean ret = client.decrypt(buf, dataSize);
+
+ if (ret && buf.hasRemaining())
+ {
+ // apply limit
+ final int limit = buf.limit();
+ buf.limit(pos + dataSize);
+ final ReceivablePacket<T> cp = core.handlePacket(buf, client);
+
+ if (cp != null)
+ {
+ cp._buf = buf;
+ cp._sbuf = pool.getPooledStringBuffer();
+ cp._client = client;
+
+ if (cp.read())
+ {
+ core.executePacket(cp);
+ }
+
+ pool.recycleStringBuffer(cp._sbuf);
+
+ cp._buf = null;
+ cp._sbuf = null;
+ }
+ buf.limit(limit);
+ }
+ }
+}
Index: .classpath
===================================================================
--- .classpath (revision 6601)
+++ .classpath (working copy)
@@ -1,7 +1,6 @@
<?xml version="1.0" encoding="UTF-8"?>
<classpath>
- <classpathentry kind="src" path="src" />
- <classpathentry kind="con" path="org.eclipse.jdt.launching.JRE_CONTAINER" />
- <classpathentry kind="lib" path="lib/javolution-5.5.1.jar" />
- <classpathentry kind="output" path="bin" />
-</classpath>
\ No newline at end of file
+ <classpathentry kind="src" path="src"/>
+ <classpathentry kind="con" path="org.eclipse.jdt.launching.JRE_CONTAINER"/>
+ <classpathentry kind="output" path="bin"/>
+</classpath>
Index: src/org/mmocore/network/SelectorConfig.java
===================================================================
--- src/org/mmocore/network/SelectorConfig.java (revision 6601)
+++ src/org/mmocore/network/SelectorConfig.java (working copy)
@@ -1,64 +0,0 @@
-/* This program is free software; you can redistribute it and/or modify
- * it under the terms of the GNU General Public License as published by
- * the Free Software Foundation; either version 2, or (at your option)
- * any later version.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU General Public License for more details.
- *
- * You should have received a copy of the GNU General Public License
- * along with this program; if not, write to the Free Software
- * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA
- * 02111-1307, USA.
- *
- * http://www.gnu.org/copyleft/gpl.html
- */
-package org.mmocore.network;
-
-/**
- * @author KenM
- */
-public final class SelectorConfig
-{
- public int READ_BUFFER_SIZE = 64 * 1024;
-
- public int WRITE_BUFFER_SIZE = 64 * 1024;
-
- public int HELPER_BUFFER_COUNT = 20;
-
- public int HELPER_BUFFER_SIZE = 64 * 1024;
-
- /**
- * Server will try to send MAX_SEND_PER_PASS packets per socket write call<br>
- * however it may send less if the write buffer was filled before achieving this value.
- */
- public int MAX_SEND_PER_PASS = 10;
-
- /**
- * Server will try to read MAX_READ_PER_PASS packets per socket read call<br>
- * however it may read less if the read buffer was empty before achieving this value.
- */
- public int MAX_READ_PER_PASS = 10;
-
- /**
- * Defines how much time (in milis) should the selector sleep, an higher value increases throughput but also increases latency(to a max of the sleep value itself).<BR>
- * Also an extremely high value(usually > 100) will decrease throughput due to the server not doing enough sends per second (depends on max sends per pass).<BR>
- * <BR>
- * Recommended values:<BR>
- * 1 for minimal latency.<BR>
- * 10-30 for an latency/troughput trade-off based on your needs.<BR>
- */
- public int SLEEP_TIME = 10;
-
- /**
- * Used to enable/disable TCP_NODELAY which disable/enable Nagle's algorithm.<BR>
- * <BR>
- * Nagle's algorithm try to conserve bandwidth by minimizing the number of segments that are sent. When applications wish to decrease network latency and increase performance, they can disable Nagle's algorithm (that is enable TCP_NODELAY). Data will be sent earlier, at the cost of an increase
- * in bandwidth consumption. The Nagle's algorithm is described in RFC 896.<BR>
- * <BR>
- * Summary, data will be sent earlier, thus lowering the ping, at the cost of a small increase in bandwidth consumption.
- */
- public boolean TCP_NODELAY = false;
-}
Index: src/org/mmocore/network/BufferPool.java
===================================================================
--- src/org/mmocore/network/BufferPool.java (revision 0)
+++ src/org/mmocore/network/BufferPool.java (working copy)
@@ -0,0 +1,151 @@
+package org.mmocore.network;
+
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.util.LinkedList;
+
+/**
+ * @author BiggBoss
+ */
+final class BufferPool
+{
+ private final ByteOrder BYTE_ORDER;
+
+ private final int nativeBufferPoolSize;
+ private final int nativeBufferSize;
+ private final int bufferPoolSize;
+ private final int bufferSize;
+ private final int stringBufferPoolSize;
+ private final int stringBufferSize;
+
+ private LinkedList<ByteBuffer> nativePool;
+ private LinkedList<ByteBuffer> pool;
+ private LinkedList<NioNetStringBuffer> stringPool;
+
+ BufferPool(final ByteOrder byteOrder, final int nativePoolSize, final int nativeBufSize, final int poolSize, final int bufferSiz,
+ final int stringPoolSize, final int stringBufSize)
+ {
+ BYTE_ORDER = byteOrder;
+
+ nativeBufferPoolSize = nativePoolSize;
+ nativeBufferSize = nativeBufSize;
+
+ bufferPoolSize = poolSize;
+ bufferSize = bufferSiz;
+
+ stringBufferPoolSize = stringPoolSize;
+ stringBufferSize = stringBufSize;
+
+ nativePool = new LinkedList<>();
+ while(nativePool.size() < nativeBufferPoolSize)
+ {
+ nativePool.add(ByteBuffer.allocateDirect(nativeBufferSize).order(BYTE_ORDER));
+ }
+
+ pool = new LinkedList<>();
+ while(pool.size() < bufferPoolSize)
+ {
+ pool.add(ByteBuffer.wrap(new byte[bufferSize]).order(BYTE_ORDER));
+ }
+
+ stringPool = new LinkedList<>();
+ while(stringPool.size() < stringBufferPoolSize)
+ {
+ stringPool.add(new NioNetStringBuffer(stringBufferSize));
+ }
+ }
+
+ ByteBuffer getPooledNativeBuffer()
+ {
+ ByteBuffer result = null;
+
+ synchronized(nativePool)
+ {
+ if(!nativePool.isEmpty())
+ {
+ result = nativePool.poll();
+ }
+ else
+ result = ByteBuffer.allocateDirect(nativeBufferSize).order(BYTE_ORDER); // should not happen. Set enough pool to avoid this point
+ }
+
+ return result;
+ }
+
+ void recycleNativeBuffer(ByteBuffer buffer)
+ {
+ if(buffer != null)
+ {
+ synchronized(nativePool)
+ {
+ if(nativePool.size() < nativeBufferPoolSize)
+ {
+ buffer.clear();
+ nativePool.addLast(buffer);
+ }
+ }
+ }
+ }
+
+ ByteBuffer getPooledBuffer()
+ {
+ ByteBuffer result = null;
+
+ synchronized(pool)
+ {
+ if(!pool.isEmpty())
+ {
+ result = pool.poll();
+ }
+ else
+ result = ByteBuffer.wrap(new byte[bufferSize]).order(BYTE_ORDER);
+ }
+
+ return result;
+ }
+
+ void recycleBuffer(ByteBuffer buffer)
+ {
+ if(buffer != null)
+ {
+ synchronized(pool)
+ {
+ if(pool.size() < bufferPoolSize)
+ {
+ buffer.clear();
+ pool.addLast(buffer);
+ }
+ }
+ }
+ }
+
+ NioNetStringBuffer getPooledStringBuffer()
+ {
+ NioNetStringBuffer result = null;
+
+ synchronized(stringPool)
+ {
+ if(!stringPool.isEmpty())
+ result = stringPool.poll();
+ else
+ result = new NioNetStringBuffer(stringBufferSize);
+ }
+
+ return result;
+ }
+
+ void recycleStringBuffer(NioNetStringBuffer stringBuffer)
+ {
+ if(stringBuffer != null)
+ {
+ synchronized(stringPool)
+ {
+ if(stringPool.size() < stringBufferPoolSize)
+ {
+ stringBuffer.clear();
+ stringPool.addLast(stringBuffer);
+ }
+ }
+ }
+ }
+}
Index: src/org/mmocore/network/WriteParseTask.java
===================================================================
--- src/org/mmocore/network/WriteParseTask.java (revision 0)
+++ src/org/mmocore/network/WriteParseTask.java (working copy)
@@ -0,0 +1,157 @@
+package org.mmocore.network;
+
+import java.nio.ByteBuffer;
+
+/**
+ * @author BiggBoss
+ * @param <T>
+ */
+public final class WriteParseTask<T extends MMOClient<?>> implements Runnable
+{
+ public static int MAX_SEND_PER_PASS;
+ public static int HEADER_SIZE;
+
+ private final MMOConnection<T> con;
+ private final Core<T> core;
+ private final AbstractWriteHandler<T> handler;
+ private final ByteBuffer DIRECT_WRITE_BUFFER;
+ private final ByteBuffer WRITE_BUFFER;
+
+ WriteParseTask(final MMOConnection<T> conn, final Core<T> corePtr, AbstractWriteHandler<T> abstractHandler)
+ {
+ con = conn;
+ core = corePtr;
+ handler = abstractHandler;
+
+ DIRECT_WRITE_BUFFER = core.getBufferPool().getPooledNativeBuffer();
+ WRITE_BUFFER = core.getBufferPool().getPooledBuffer();
+ }
+
+ WriteParseTask(final MMOConnection<T> conn, final Core<T> corePtr)
+ {
+ this(conn, corePtr, corePtr.getWriteCompletionHandler());
+ }
+
+ @Override
+ public void run()
+ {
+ try
+ {
+ writePacket(con);
+ }
+ catch(Exception e)
+ {
+ e.printStackTrace();
+ }
+ }
+
+ void writePacket(MMOConnection<T> con)
+ {
+ if (!prepareWriteBuffer(con))
+ {
+ return;
+ }
+
+ DIRECT_WRITE_BUFFER.flip();
+
+ final int size = DIRECT_WRITE_BUFFER.remaining();
+
+ try
+ {
+ con.setLastWritePassSize(size);
+ con.setTempWriteBuffer(DIRECT_WRITE_BUFFER);
+ con.startWriteTask(DIRECT_WRITE_BUFFER, handler);
+ }
+ catch (Exception e)
+ {
+ con.getClient().onForcedDisconnection();
+ core.closeConnectionImpl(con);
+ }
+
+
+ }
+
+ private final boolean prepareWriteBuffer(final MMOConnection<T> con)
+ {
+ boolean hasPending = false;
+ DIRECT_WRITE_BUFFER.clear();
+
+ // if there is pending content add it
+ if (con.hasPendingWriteBuffer())
+ {
+ con.movePendingWriteBufferTo(DIRECT_WRITE_BUFFER);
+ hasPending = true;
+ }
+
+ if (DIRECT_WRITE_BUFFER.remaining() > 1 && !con.hasPendingWriteBuffer())
+ {
+ final NioNetStackList<SendablePacket<T>> sendQueue = con.getSendQueue();
+ final T client = con.getClient();
+ SendablePacket<T> sp;
+
+ for (int i = 0; i < MAX_SEND_PER_PASS; i++)
+ {
+ synchronized (con.getSendQueue())
+ {
+ if (sendQueue.isEmpty())
+ sp = null;
+ else
+ sp = sendQueue.removeFirst();
+ }
+
+ if (sp == null)
+ break;
+
+ hasPending = true;
+
+ // put into WriteBuffer
+ putPacketIntoWriteBuffer(client, sp);
+
+ WRITE_BUFFER.flip();
+
+ if (DIRECT_WRITE_BUFFER.remaining() >= WRITE_BUFFER.limit())
+ {
+ DIRECT_WRITE_BUFFER.put(WRITE_BUFFER);
+ }
+ else
+ {
+ con.createWriteBuffer(WRITE_BUFFER);
+ break;
+ }
+
+ core.getBufferPool().recycleBuffer(WRITE_BUFFER);
+ }
+ }
+ return hasPending;
+ }
+
+ private final void putPacketIntoWriteBuffer(final T client, final SendablePacket<T> sp)
+ {
+ WRITE_BUFFER.clear();
+
+ // reserve space for the size
+ final int headerPos = WRITE_BUFFER.position();
+ final int dataPos = headerPos + HEADER_SIZE;
+ WRITE_BUFFER.position(dataPos);
+
+ // set the write buffer
+ sp._buf = WRITE_BUFFER;
+ // write content to buffer
+ sp.write();
+ // delete the write buffer
+ sp._buf = null;
+
+ // size (inclusive header)
+ int dataSize = WRITE_BUFFER.position() - dataPos;
+ WRITE_BUFFER.position(dataPos);
+ client.encrypt(WRITE_BUFFER, dataSize);
+
+ // recalculate size after encryption
+ dataSize = WRITE_BUFFER.position() - dataPos;
+
+ WRITE_BUFFER.position(headerPos);
+ // write header
+ WRITE_BUFFER.putShort((short) (dataSize + HEADER_SIZE));
+ WRITE_BUFFER.position(dataPos + dataSize);
+ }
+}
Index: src/org/mmocore/network/ConnectionHandler.java
===================================================================
--- src/org/mmocore/network/ConnectionHandler.java (revision 0)
+++ src/org/mmocore/network/ConnectionHandler.java (working copy)
@@ -0,0 +1,30 @@
+package org.mmocore.network;
+
+import java.nio.channels.AsynchronousSocketChannel;
+import java.nio.channels.CompletionHandler;
+
+/**
+ * @author BiggBoss
+ * @param <T>
+ */
+public final class ConnectionHandler<T extends MMOClient<?>> implements CompletionHandler<AsynchronousSocketChannel, Void>
+{
+ private final ConnectionManager<T> conManager;
+
+ ConnectionHandler(final ConnectionManager<T> manager)
+ {
+ conManager = manager;
+ }
+
+ @Override
+ public synchronized void completed(AsynchronousSocketChannel result, Void attachment)
+ {
+ conManager.acceptConnection(result);
+ }
+
+ @Override
+ public synchronized void failed(Throwable exc, Void attachment)
+ {
+ conManager.failedAcceptConnection(exc);
+ }
+}
Index: src/org/mmocore/network/ConnectionManager.java
===================================================================
--- src/org/mmocore/network/ConnectionManager.java (revision 0)
+++ src/org/mmocore/network/ConnectionManager.java (working copy)
@@ -0,0 +1,72 @@
+package org.mmocore.network;
+
+import java.net.InetSocketAddress;
+import java.net.StandardSocketOptions;
+import java.nio.channels.AsynchronousServerSocketChannel;
+import java.nio.channels.AsynchronousSocketChannel;
+
+/**
+ * @author BiggBoss
+ * @param <T>
+ */
+public final class ConnectionManager<T extends MMOClient<?>>
+{
+ private final AsynchronousServerSocketChannel server;
+ private final ConnectionHandler<T> conHandler;
+ private final Core<T> core;
+
+ ConnectionManager(final AsynchronousServerSocketChannel serv, final Core<T> coreRef)
+ {
+ server = serv;
+ conHandler = new ConnectionHandler<>(this);
+ core = coreRef;
+ }
+
+ void init()
+ {
+ new Thread(new ConnectionAcceptor<>(core, this)).start();
+ }
+
+ synchronized void tryAcceptConnection()
+ {
+ try
+ {
+ server.accept(null, conHandler);
+ wait();
+ }
+ catch(Exception e)
+ {
+ e.printStackTrace();
+ }
+ }
+
+ synchronized void acceptConnection(final AsynchronousSocketChannel socket)
+ {
+ try
+ {
+ if(core.getAcceptFilter().accept(socket))
+ {
+ socket.setOption(StandardSocketOptions.SO_KEEPALIVE, Boolean.TRUE);
+ socket.setOption(StandardSocketOptions.TCP_NODELAY, Boolean.TRUE);
+ InetSocketAddress isaddr = (InetSocketAddress)socket.getRemoteAddress();
+ MMOConnection<T> con = new MMOConnection<>(core, core.getBufferPool(), socket, isaddr.getAddress(), isaddr.getPort());
+ con.setClient(core.getClientFactory().create(con));
+ con.startReadTask(core.getReadCompletionHandler());
+ }
+ }
+ catch(Exception e)
+ {
+ e.printStackTrace();
+ }
+ finally
+ {
+ notify();
+ }
+ }
+
+ synchronized void failedAcceptConnection(final Throwable t)
+ {
+ t.printStackTrace();
+ notify();
+ }
+}
Index: src/org/mmocore/network/ConnectionCloseHandler.java
===================================================================
--- src/org/mmocore/network/ConnectionCloseHandler.java (revision 0)
+++ src/org/mmocore/network/ConnectionCloseHandler.java (working copy)
@@ -0,0 +1,33 @@
+package org.mmocore.network;
+
+/**
+ * @author BiggBoss
+ * @param <T>
+ */
+public final class ConnectionCloseHandler<T extends MMOClient<?>> extends AbstractWriteHandler<T>
+{
+ ConnectionCloseHandler(final Core<T> corePtr)
+ {
+ super(corePtr);
+ }
+
+ /* (non-Javadoc)
+ * @see java.nio.channels.CompletionHandler#completed(java.lang.Object, java.lang.Object)
+ */
+ @Override
+ public void completed(Integer result, MMOConnection<T> con)
+ {
+ //core.closeConnectionImpl(con);
+ }
+
+ /* (non-Javadoc)
+ * @see java.nio.channels.CompletionHandler#failed(java.lang.Throwable, java.lang.Object)
+ */
+ @Override
+ public void failed(Throwable t, MMOConnection<T> con)
+ {
+ // log t somewhere
+ //core.closeConnectionImpl(con);
+ }
+
+}
Index: src/org/mmocore/network/SelectorThread.java
===================================================================
--- src/org/mmocore/network/SelectorThread.java (revision 6601)
+++ src/org/mmocore/network/SelectorThread.java (working copy)
@@ -1,704 +0,0 @@
-/* This program is free software; you can redistribute it and/or modify
- * it under the terms of the GNU General Public License as published by
- * the Free Software Foundation; either version 2, or (at your option)
- * any later version.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU General Public License for more details.
- *
- * You should have received a copy of the GNU General Public License
- * along with this program; if not, write to the Free Software
- * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA
- * 02111-1307, USA.
- *
- * http://www.gnu.org/copyleft/gpl.html
- */
-package org.mmocore.network;
-
-import java.io.IOException;
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-import java.net.ServerSocket;
-import java.nio.ByteBuffer;
-import java.nio.ByteOrder;
-import java.nio.channels.SelectionKey;
-import java.nio.channels.Selector;
-import java.nio.channels.ServerSocketChannel;
-import java.nio.channels.SocketChannel;
-import java.util.Iterator;
-
-import javolution.util.FastList;
-
-/**
- * @author KenM<BR>
- * Parts of design based on networkcore from WoodenGil
- * @param <T>
- */
-public final class SelectorThread<T extends MMOClient<?>> extends Thread
-{
- // default BYTE_ORDER
- private static final ByteOrder BYTE_ORDER = ByteOrder.LITTLE_ENDIAN;
- // default HEADER_SIZE
- private static final int HEADER_SIZE = 2;
- // Selector
- private final Selector _selector;
- // Implementations
- private final IPacketHandler<T> _packetHandler;
- private final IMMOExecutor<T> _executor;
- private final IClientFactory<T> _clientFactory;
- private final IAcceptFilter _acceptFilter;
- // Configurations
- private final int HELPER_BUFFER_SIZE;
- private final int HELPER_BUFFER_COUNT;
- private final int MAX_SEND_PER_PASS;
- private final int MAX_READ_PER_PASS;
- private final long SLEEP_TIME;
- public boolean TCP_NODELAY;
- // Main Buffers
- private final ByteBuffer DIRECT_WRITE_BUFFER;
- private final ByteBuffer WRITE_BUFFER;
- private final ByteBuffer READ_BUFFER;
- // String Buffer
- private final NioNetStringBuffer STRING_BUFFER;
- // ByteBuffers General Purpose Pool
- private final FastList<ByteBuffer> _bufferPool;
- // Pending Close
- private final NioNetStackList<MMOConnection<T>> _pendingClose;
-
- private boolean _shutdown;
-
- public SelectorThread(final SelectorConfig sc, final IMMOExecutor<T> executor, final IPacketHandler<T> packetHandler, final IClientFactory<T> clientFactory, final IAcceptFilter acceptFilter) throws IOException
- {
- super.setName("SelectorThread-" + super.getId());
-
- HELPER_BUFFER_SIZE = sc.HELPER_BUFFER_SIZE;
- HELPER_BUFFER_COUNT = sc.HELPER_BUFFER_COUNT;
- MAX_SEND_PER_PASS = sc.MAX_SEND_PER_PASS;
- MAX_READ_PER_PASS = sc.MAX_READ_PER_PASS;
- SLEEP_TIME = sc.SLEEP_TIME;
- TCP_NODELAY = sc.TCP_NODELAY;
-
- DIRECT_WRITE_BUFFER = ByteBuffer.allocateDirect(sc.WRITE_BUFFER_SIZE).order(BYTE_ORDER);
- WRITE_BUFFER = ByteBuffer.wrap(new byte[sc.WRITE_BUFFER_SIZE]).order(BYTE_ORDER);
- READ_BUFFER = ByteBuffer.wrap(new byte[sc.READ_BUFFER_SIZE]).order(BYTE_ORDER);
-
- STRING_BUFFER = new NioNetStringBuffer(64 * 1024);
-
- _pendingClose = new NioNetStackList<>();
- _bufferPool = new FastList<>();
-
- for (int i = 0; i < HELPER_BUFFER_COUNT; i++)
- {
- _bufferPool.addLast(ByteBuffer.wrap(new byte[HELPER_BUFFER_SIZE]).order(BYTE_ORDER));
- }
-
- _acceptFilter = acceptFilter;
- _packetHandler = packetHandler;
- _clientFactory = clientFactory;
- _executor = executor;
- _selector = Selector.open();
- }
-
- public final void openServerSocket(InetAddress address, int tcpPort) throws IOException
- {
- ServerSocketChannel selectable = ServerSocketChannel.open();
- selectable.configureBlocking(false);
-
- ServerSocket ss = selectable.socket();
-
- if (address == null)
- {
- ss.bind(new InetSocketAddress(tcpPort));
- }
- else
- {
- ss.bind(new InetSocketAddress(address, tcpPort));
- }
-
- selectable.register(_selector, SelectionKey.OP_ACCEPT);
- }
-
- final ByteBuffer getPooledBuffer()
- {
- if (_bufferPool.isEmpty())
- {
- return ByteBuffer.wrap(new byte[HELPER_BUFFER_SIZE]).order(BYTE_ORDER);
- }
-
- return _bufferPool.removeFirst();
- }
-
- final void recycleBuffer(final ByteBuffer buf)
- {
- if (_bufferPool.size() < HELPER_BUFFER_COUNT)
- {
- buf.clear();
- _bufferPool.addLast(buf);
- }
- }
-
- @SuppressWarnings("unchecked")
- @Override
- public final void run()
- {
- int selectedKeysCount = 0;
-
- SelectionKey key;
- MMOConnection<T> con;
-
- Iterator<SelectionKey> selectedKeys;
-
- while (!_shutdown)
- {
- try
- {
- selectedKeysCount = _selector.selectNow();
- }
- catch (IOException e)
- {
- e.printStackTrace();
- }
-
- if (selectedKeysCount > 0)
- {
- selectedKeys = _selector.selectedKeys().iterator();
-
- while (selectedKeys.hasNext())
- {
- key = selectedKeys.next();
- selectedKeys.remove();
-
- con = (MMOConnection<T>) key.attachment();
-
- switch (key.readyOps())
- {
- case SelectionKey.OP_CONNECT:
- finishConnection(key, con);
- break;
- case SelectionKey.OP_ACCEPT:
- acceptConnection(key, con);
- break;
- case SelectionKey.OP_READ:
- readPacket(key, con);
- break;
- case SelectionKey.OP_WRITE:
- writePacket(key, con);
- break;
- case SelectionKey.OP_READ | SelectionKey.OP_WRITE:
- writePacket(key, con);
- if (key.isValid())
- {
- readPacket(key, con);
- }
- break;
- }
- }
- }
-
- synchronized (_pendingClose)
- {
- while (!_pendingClose.isEmpty())
- {
- try
- {
- con = _pendingClose.removeFirst();
- writeClosePacket(con);
- closeConnectionImpl(con.getSelectionKey(), con);
- }
- catch (Exception e)
- {
- e.printStackTrace();
- }
- }
- }
-
- try
- {
- Thread.sleep(SLEEP_TIME);
- }
- catch (InterruptedException e)
- {
- e.printStackTrace();
- }
- }
- closeSelectorThread();
- }
-
- private final void finishConnection(final SelectionKey key, final MMOConnection<T> con)
- {
- try
- {
- ((SocketChannel) key.channel()).finishConnect();
- }
- catch (IOException e)
- {
- con.getClient().onForcedDisconnection();
- closeConnectionImpl(key, con);
- }
-
- // key might have been invalidated on finishConnect()
- if (key.isValid())
- {
- key.interestOps(key.interestOps() | SelectionKey.OP_READ);
- key.interestOps(key.interestOps() & ~SelectionKey.OP_CONNECT);
- }
- }
-
- private final void acceptConnection(final SelectionKey key, MMOConnection<T> con)
- {
- ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
- SocketChannel sc;
-
- try
- {
- while ((sc = ssc.accept()) != null)
- {
- if ((_acceptFilter == null) || _acceptFilter.accept(sc))
- {
- sc.configureBlocking(false);
- SelectionKey clientKey = sc.register(_selector, SelectionKey.OP_READ);
- con = new MMOConnection<>(this, sc.socket(), clientKey, TCP_NODELAY);
- con.setClient(_clientFactory.create(con));
- clientKey.attach(con);
- }
- else
- {
- sc.socket().close();
- }
- }
- }
- catch (IOException e)
- {
- e.printStackTrace();
- }
- }
-
- private final void readPacket(final SelectionKey key, final MMOConnection<T> con)
- {
- if (!con.isClosed())
- {
- ByteBuffer buf;
- if ((buf = con.getReadBuffer()) == null)
- {
- buf = READ_BUFFER;
- }
-
- // if we try to to do a read with no space in the buffer it will
- // read 0 bytes
- // going into infinite loop
- if (buf.position() == buf.limit())
- {
- System.exit(0);
- }
-
- int result = -2;
-
- try
- {
- result = con.read(buf);
- }
- catch (IOException e)
- {
- // error handling goes bellow
- }
-
- if (result > 0)
- {
- buf.flip();
-
- final T client = con.getClient();
-
- for (int i = 0; i < MAX_READ_PER_PASS; i++)
- {
- if (!tryReadPacket(key, client, buf, con))
- {
- return;
- }
- }
-
- // only reachable if MAX_READ_PER_PASS has been reached
- // check if there are some more bytes in buffer
- // and allocate/compact to prevent content lose.
- if (buf.remaining() > 0)
- {
- // did we use the READ_BUFFER ?
- if (buf == READ_BUFFER)
- {
- // move the pending byte to the connections READ_BUFFER
- allocateReadBuffer(con);
- }
- else
- {
- // move the first byte to the beginning :)
- buf.compact();
- }
- }
- }
- else
- {
- switch (result)
- {
- case 0:
- case -1:
- closeConnectionImpl(key, con);
- break;
- case -2:
- con.getClient().onForcedDisconnection();
- closeConnectionImpl(key, con);
- break;
- }
- }
- }
- }
-
- private final boolean tryReadPacket(final SelectionKey key, final T client, final ByteBuffer buf, final MMOConnection<T> con)
- {
- switch (buf.remaining())
- {
- case 0:
- // buffer is full
- // nothing to read
- return false;
- case 1:
- // we don`t have enough data for header so we need to read
- key.interestOps(key.interestOps() | SelectionKey.OP_READ);
-
- // did we use the READ_BUFFER ?
- if (buf == READ_BUFFER)
- {
- // move the pending byte to the connections READ_BUFFER
- allocateReadBuffer(con);
- }
- else
- {
- // move the first byte to the beginning :)
- buf.compact();
- }
- return false;
- default:
- // data size excluding header size :>
- final int dataPending = (buf.getShort() & 0xFFFF) - HEADER_SIZE;
-
- // do we got enough bytes for the packet?
- if (dataPending <= buf.remaining())
- {
- // avoid parsing dummy packets (packets without body)
- if (dataPending > 0)
- {
- final int pos = buf.position();
- parseClientPacket(pos, buf, dataPending, client);
- buf.position(pos + dataPending);
- }
-
- // if we are done with this buffer
- if (!buf.hasRemaining())
- {
- if (buf != READ_BUFFER)
- {
- con.setReadBuffer(null);
- recycleBuffer(buf);
- }
- else
- {
- READ_BUFFER.clear();
- }
- return false;
- }
- return true;
- }
-
- // we don`t have enough bytes for the dataPacket so we need
- // to read
- key.interestOps(key.interestOps() | SelectionKey.OP_READ);
-
- // did we use the READ_BUFFER ?
- if (buf == READ_BUFFER)
- {
- // move it`s position
- buf.position(buf.position() - HEADER_SIZE);
- // move the pending byte to the connections READ_BUFFER
- allocateReadBuffer(con);
- }
- else
- {
- buf.position(buf.position() - HEADER_SIZE);
- buf.compact();
- }
- return false;
- }
- }
-
- private final void allocateReadBuffer(final MMOConnection<T> con)
- {
- con.setReadBuffer(getPooledBuffer().put(READ_BUFFER));
- READ_BUFFER.clear();
- }
-
- private final void parseClientPacket(final int pos, final ByteBuffer buf, final int dataSize, final T client)
- {
- final boolean ret = client.decrypt(buf, dataSize);
-
- if (ret && buf.hasRemaining())
- {
- // apply limit
- final int limit = buf.limit();
- buf.limit(pos + dataSize);
- final ReceivablePacket<T> cp = _packetHandler.handlePacket(buf, client);
-
- if (cp != null)
- {
- cp._buf = buf;
- cp._sbuf = STRING_BUFFER;
- cp._client = client;
-
- if (cp.read())
- {
- _executor.execute(cp);
- }
-
- cp._buf = null;
- cp._sbuf = null;
- }
- buf.limit(limit);
- }
- }
-
- private final void writeClosePacket(final MMOConnection<T> con)
- {
- SendablePacket<T> sp;
- synchronized (con.getSendQueue())
- {
- if (con.getSendQueue().isEmpty())
- {
- return;
- }
-
- while ((sp = con.getSendQueue().removeFirst()) != null)
- {
- WRITE_BUFFER.clear();
-
- putPacketIntoWriteBuffer(con.getClient(), sp);
-
- WRITE_BUFFER.flip();
-
- try
- {
- con.write(WRITE_BUFFER);
- }
- catch (IOException e)
- {
- // error handling goes on the if bellow
- }
- }
- }
- }
-
- protected final void writePacket(final SelectionKey key, final MMOConnection<T> con)
- {
- if (!prepareWriteBuffer(con))
- {
- key.interestOps(key.interestOps() & ~SelectionKey.OP_WRITE);
- return;
- }
-
- DIRECT_WRITE_BUFFER.flip();
-
- final int size = DIRECT_WRITE_BUFFER.remaining();
-
- int result = -1;
-
- try
- {
- result = con.write(DIRECT_WRITE_BUFFER);
- }
- catch (IOException e)
- {
- // error handling goes on the if bellow
- }
-
- // check if no error happened
- if (result >= 0)
- {
- // check if we written everything
- if (result == size)
- {
- // complete write
- synchronized (con.getSendQueue())
- {
- if (con.getSendQueue().isEmpty() && !con.hasPendingWriteBuffer())
- {
- key.interestOps(key.interestOps() & ~SelectionKey.OP_WRITE);
- }
- }
- }
- else
- {
- // incomplete write
- con.createWriteBuffer(DIRECT_WRITE_BUFFER);
- }
- }
- else
- {
- con.getClient().onForcedDisconnection();
- closeConnectionImpl(key, con);
- }
- }
-
- private final boolean prepareWriteBuffer(final MMOConnection<T> con)
- {
- boolean hasPending = false;
- DIRECT_WRITE_BUFFER.clear();
-
- // if there is pending content add it
- if (con.hasPendingWriteBuffer())
- {
- con.movePendingWriteBufferTo(DIRECT_WRITE_BUFFER);
- hasPending = true;
- }
-
- if ((DIRECT_WRITE_BUFFER.remaining() > 1) && !con.hasPendingWriteBuffer())
- {
- final NioNetStackList<SendablePacket<T>> sendQueue = con.getSendQueue();
- final T client = con.getClient();
- SendablePacket<T> sp;
-
- for (int i = 0; i < MAX_SEND_PER_PASS; i++)
- {
- synchronized (con.getSendQueue())
- {
- if (sendQueue.isEmpty())
- {
- sp = null;
- }
- else
- {
- sp = sendQueue.removeFirst();
- }
- }
-
- if (sp == null)
- {
- break;
- }
-
- hasPending = true;
-
- // put into WriteBuffer
- putPacketIntoWriteBuffer(client, sp);
-
- WRITE_BUFFER.flip();
-
- if (DIRECT_WRITE_BUFFER.remaining() >= WRITE_BUFFER.limit())
- {
- DIRECT_WRITE_BUFFER.put(WRITE_BUFFER);
- }
- else
- {
- con.createWriteBuffer(WRITE_BUFFER);
- break;
- }
- }
- }
- return hasPending;
- }
-
- private final void putPacketIntoWriteBuffer(final T client, final SendablePacket<T> sp)
- {
- WRITE_BUFFER.clear();
-
- // reserve space for the size
- final int headerPos = WRITE_BUFFER.position();
- final int dataPos = headerPos + HEADER_SIZE;
- WRITE_BUFFER.position(dataPos);
-
- // set the write buffer
- sp._buf = WRITE_BUFFER;
- // set the client.
- sp._client = client;
- // write content to buffer
- sp.write();
- // delete the write buffer
- sp._buf = null;
-
- // size (inclusive header)
- int dataSize = WRITE_BUFFER.position() - dataPos;
- WRITE_BUFFER.position(dataPos);
- client.encrypt(WRITE_BUFFER, dataSize);
-
- // recalculate size after encryption
- dataSize = WRITE_BUFFER.position() - dataPos;
-
- WRITE_BUFFER.position(headerPos);
- // write header
- WRITE_BUFFER.putShort((short) (dataSize + HEADER_SIZE));
- WRITE_BUFFER.position(dataPos + dataSize);
- }
-
- final void closeConnection(final MMOConnection<T> con)
- {
- synchronized (_pendingClose)
- {
- _pendingClose.addLast(con);
- }
- }
-
- private final void closeConnectionImpl(final SelectionKey key, final MMOConnection<T> con)
- {
- try
- {
- // notify connection
- con.getClient().onDisconnection();
- }
- finally
- {
- try
- {
- // close socket and the SocketChannel
- con.close();
- }
- catch (IOException e)
- {
- // ignore, we are closing anyway
- }
- finally
- {
- con.releaseBuffers();
- // clear attachment
- key.attach(null);
- // cancel key
- key.cancel();
- }
- }
- }
-
- public final void shutdown()
- {
- _shutdown = true;
- }
-
- protected void closeSelectorThread()
- {
- for (final SelectionKey key : _selector.keys())
- {
- try
- {
- key.channel().close();
- }
- catch (IOException e)
- {
- // ignore
- }
- }
-
- try
- {
- _selector.close();
- }
- catch (IOException e)
- {
- // Ignore
- }
- }
-}
Index: build.xml
===================================================================
--- build.xml (revision 6601)
+++ build.xml (working copy)
@@ -1,5 +1,5 @@
<?xml version="1.0" encoding="UTF-8"?>
-<project name="MMOCore" default="build" basedir=".">
+<project name="AsyncMMOCore" default="build" basedir=".">
<description>
This script will build the MMOCore lib.
@@ -22,7 +22,6 @@
</description>
<property name="src" location="src" />
- <property name="lib" location="lib" />
<property name="build" location="build" />
<property name="build.classes" location="${build}/classes" />
<property name="build.dist" location="${build}/dist" />
@@ -33,27 +32,17 @@
</fileset>
</path>
- <pathconvert property="manifest.libs" pathsep=" ">
- <path refid="classpath" />
- <mapper>
- <chainedmapper>
- <flattenmapper />
- <globmapper from="*.jar" to="*.jar" />
- </chainedmapper>
- </mapper>
- </pathconvert>
-
- <target name="init" depends="clean,checkRequirements,getChangelogDateVersion" description="Create the output directories.">
+ <target name="init" depends="clean,checkRequirements" description="Create the output directories.">
<mkdir dir="${build}" />
<mkdir dir="${build.classes}" />
</target>
<target name="compile" depends="init" description="Compile the source.">
- <javac srcdir="${src}" classpathref="classpath" destdir="${build.classes}" compiler="javac1.7" debug="true" debuglevel="lines,vars,source" includeantruntime="false" source="1.7" target="1.7" />
+ <javac srcdir="${src}" destdir="${build.classes}" compiler="javac1.7" debug="true" debuglevel="lines,vars,source" includeantruntime="false" source="1.8" target="1.8" />
</target>
<target name="jar" depends="compile" description="Create the jar file.">
- <jar destfile="${build.dist}/mmocore.jar" level="9">
+ <jar destfile="${build.dist}/asyncmmocore.jar" level="9">
<fileset dir="${build.classes}" />
<manifest>
<attribute name="Built-By" value="${user.name}" />
@@ -65,11 +54,7 @@
</jar>
</target>
- <target name="dist" depends="jar">
- <concat destfile="${build.dist}/MMOCore_CHANGELOG.txt">${l2j.changelog}</concat>
- </target>
-
- <target name="build" depends="dist" description="Generates a zip with the MMOCore.">
+ <target name="build" depends="jar" description="Generates a zip with the MMOCore.">
<zip destfile="${build}/mmocore.zip" basedir="${build.dist}" level="9" />
</target>
@@ -88,24 +73,4 @@
<available classname="java.lang.AutoCloseable" property="JDK7.present" />
<fail unless="JDK7.present" message="Java 1.7 is required. But your version is Java ${ant.java.version} and probably JDK is not installed." />
</target>
-
- <target name="getChangelogDateVersion" description="Get Changelog, Date, Version">
- <exec dir="${basedir}" executable="svn" outputproperty="l2j.changelog">
- <arg value="log" />
- <arg value="--stop-on-copy" />
- </exec>
- <tstamp>
- <format property="build.tstamp" pattern="dd/MM/yyyy HH:mm" />
- </tstamp>
- <exec dir="${basedir}" executable="svnversion" outputproperty="l2j.version">
- <arg value="-c" />
- <redirector>
- <outputfilterchain>
- <tokenfilter>
- <replaceregex pattern="[0-9]+\:" replace="" />
- </tokenfilter>
- </outputfilterchain>
- </redirector>
- </exec>
- </target>
</project>
\ No newline at end of file
Index: src/org/mmocore/network/IteratingBlockingQueue.java
===================================================================
--- src/org/mmocore/network/IteratingBlockingQueue.java (revision 0)
+++ src/org/mmocore/network/IteratingBlockingQueue.java (working copy)
@@ -0,0 +1,40 @@
+package org.mmocore.network;
+
+import java.util.LinkedList;
+import java.util.NoSuchElementException;
+
+/**
+ * @author BiggBoss
+ * @param <T>
+ */
+public final class IteratingBlockingQueue<T>
+{
+ private LinkedList<T> queue;
+
+ IteratingBlockingQueue()
+ {
+ queue = new LinkedList<>();
+ }
+
+ synchronized boolean isEmpty()
+ {
+ return queue.isEmpty();
+ }
+
+ /**
+ * Returns the next processing element
+ * @return T (next processing element)
+ * @throws NoSuchElementException
+ */
+ synchronized T next() throws NoSuchElementException
+ {
+ T t = queue.removeFirst();
+ queue.addLast(t);
+ return t;
+ }
+
+ synchronized void remove(T val)
+ {
+ queue.remove(val);
+ }
+}
Index: src/org/mmocore/network/MMOConnection.java
===================================================================
--- src/org/mmocore/network/MMOConnection.java (revision 6601)
+++ src/org/mmocore/network/MMOConnection.java (working copy)
@@ -17,40 +17,34 @@
*/
package org.mmocore.network;
-import java.io.IOException;
import java.net.InetAddress;
-import java.net.Socket;
-import java.net.SocketException;
import java.nio.ByteBuffer;
-import java.nio.channels.CancelledKeyException;
-import java.nio.channels.ReadableByteChannel;
-import java.nio.channels.SelectionKey;
-import java.nio.channels.WritableByteChannel;
+import java.nio.channels.AsynchronousSocketChannel;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReentrantLock;
/**
* @author KenM
- * @param <T>
+ * @author BiggBoss
+ * @param <T>
+ *
*/
public class MMOConnection<T extends MMOClient<?>>
{
- private final SelectorThread<T> _selectorThread;
+ private final Core<T> _core;
- private final Socket _socket;
+ private final BufferPool _pool;
+ private final AsynchronousSocketChannel _socket;
+
private final InetAddress _address;
- private final ReadableByteChannel _readableByteChannel;
-
- private final WritableByteChannel _writableByteChannel;
-
private final int _port;
private final NioNetStackList<SendablePacket<T>> _sendQueue;
- private final SelectionKey _selectionKey;
+ //private SendablePacket<T> _closePacket;
- // private SendablePacket<T> _closePacket;
-
private ByteBuffer _readBuffer;
private ByteBuffer _primaryWriteBuffer;
@@ -61,28 +55,51 @@
private T _client;
- public MMOConnection(final SelectorThread<T> selectorThread, final Socket socket, final SelectionKey key, boolean tcpNoDelay)
+ private int _lastWritePass;
+ private ByteBuffer _tempBuffer;
+ private boolean _pendingWrite = false;
+ private ReentrantLock _writeLock;
+
+ public MMOConnection(final Core<T> core, final BufferPool bufPool, final AsynchronousSocketChannel socket, InetAddress address, int port)
{
- _selectorThread = selectorThread;
+ _core = core;
+ _pool = bufPool;
_socket = socket;
- _address = socket.getInetAddress();
- _readableByteChannel = socket.getChannel();
- _writableByteChannel = socket.getChannel();
- _port = socket.getPort();
- _selectionKey = key;
+ _address = address;
+ _port = port;
_sendQueue = new NioNetStackList<>();
- try
- {
- _socket.setTcpNoDelay(tcpNoDelay);
- }
- catch (SocketException e)
- {
- e.printStackTrace();
- }
+ _writeLock = new ReentrantLock();
}
+ final void setPendingWritting(boolean val)
+ {
+ _writeLock.lock();
+ _pendingWrite = val;
+ _writeLock.unlock();
+ }
+
+ final void setLastWritePassSize(int size)
+ {
+ _lastWritePass = size;
+ }
+
+ final int getLastWritePassSize()
+ {
+ return _lastWritePass;
+ }
+
+ final void setTempWriteBuffer(final ByteBuffer buf)
+ {
+ _tempBuffer = buf;
+ }
+
+ final ByteBuffer getTempWriteBuffer()
+ {
+ return _tempBuffer;
+ }
+
final void setClient(final T client)
{
_client = client;
@@ -98,33 +115,37 @@
sp._client = _client;
if (_pendingClose)
- {
return;
- }
synchronized (getSendQueue())
{
_sendQueue.addLast(sp);
}
+ executeWriteTask(_core.getWriteCompletionHandler());
+ }
+
+ void executeWriteTask(AbstractWriteHandler<T> handler)
+ {
if (!_sendQueue.isEmpty())
{
try
{
- _selectionKey.interestOps(_selectionKey.interestOps() | SelectionKey.OP_WRITE);
+ _writeLock.lock();
+ if(!_pendingWrite)
+ {
+ _core.executeWriteTask(new WriteParseTask<>(this, _core, handler));
+ _pendingWrite = true;
+ }
+ _writeLock.unlock();
}
- catch (CancelledKeyException e)
+ catch (Exception e)
{
- // ignore
+ e.printStackTrace();
}
}
}
- final SelectionKey getSelectionKey()
- {
- return _selectionKey;
- }
-
public final InetAddress getInetAddress()
{
return _address;
@@ -135,31 +156,42 @@
return _port;
}
- final void close() throws IOException
+ final void close() throws Exception
{
_socket.close();
}
- final int read(final ByteBuffer buf) throws IOException
+ final void startReadTask(ReadHandler<T> handler)
{
- return _readableByteChannel.read(buf);
+ if(_readBuffer == null)
+ _readBuffer = _pool.getPooledBuffer();
+
+ try
+ {
+ _socket.read(_readBuffer, -1, TimeUnit.MILLISECONDS, this, handler);
+ }
+ catch(Exception e)
+ {
+ _client.onDisconnection();
+ _core.closeConnectionImpl(this);
+ }
}
- final int write(final ByteBuffer buf) throws IOException
+ final void startWriteTask(ByteBuffer buf, AbstractWriteHandler<T> handler)
{
- return _writableByteChannel.write(buf);
+ _socket.write(buf, -1, TimeUnit.MILLISECONDS, this, handler);
}
final void createWriteBuffer(final ByteBuffer buf)
{
if (_primaryWriteBuffer == null)
{
- _primaryWriteBuffer = _selectorThread.getPooledBuffer();
+ _primaryWriteBuffer = _pool.getPooledBuffer();
_primaryWriteBuffer.put(buf);
}
else
{
- final ByteBuffer temp = _selectorThread.getPooledBuffer();
+ final ByteBuffer temp = _pool.getPooledBuffer();
temp.put(buf);
final int remaining = temp.remaining();
@@ -169,7 +201,7 @@
if (remaining >= _primaryWriteBuffer.remaining())
{
temp.put(_primaryWriteBuffer);
- _selectorThread.recycleBuffer(_primaryWriteBuffer);
+ _pool.recycleBuffer(_primaryWriteBuffer);
_primaryWriteBuffer = temp;
}
else
@@ -193,7 +225,7 @@
{
_primaryWriteBuffer.flip();
dest.put(_primaryWriteBuffer);
- _selectorThread.recycleBuffer(_primaryWriteBuffer);
+ _pool.recycleBuffer(_primaryWriteBuffer);
_primaryWriteBuffer = _secondaryWriteBuffer;
_secondaryWriteBuffer = null;
}
@@ -218,26 +250,22 @@
return _sendQueue;
}
- /*
- * final SendablePacket<T> getClosePacket() { return _closePacket; }
- */
+ /*final SendablePacket<T> getClosePacket()
+ {
+ return _closePacket;
+ }*/
@SuppressWarnings("unchecked")
public final void close(final SendablePacket<T> sp)
{
- close(new SendablePacket[]
- {
- sp
- });
+ close(new SendablePacket[] { sp });
}
public final void close(final SendablePacket<T>[] closeList)
{
if (_pendingClose)
- {
return;
- }
synchronized (getSendQueue())
{
@@ -246,43 +274,49 @@
_pendingClose = true;
_sendQueue.clear();
for (SendablePacket<T> sp : closeList)
- {
_sendQueue.addLast(sp);
- }
}
}
try
{
- _selectionKey.interestOps(_selectionKey.interestOps() & ~SelectionKey.OP_WRITE);
+ executeWriteTask(_core.getCloserCompletionHandler());
}
- catch (CancelledKeyException e)
+ catch (Exception e)
{
- // ignore
+ _core.closeConnectionImpl(this);
}
-
- // _closePacket = sp;
- _selectorThread.closeConnection(this);
}
final void releaseBuffers()
{
if (_primaryWriteBuffer != null)
{
- _selectorThread.recycleBuffer(_primaryWriteBuffer);
+ _pool.recycleBuffer(_primaryWriteBuffer);
_primaryWriteBuffer = null;
if (_secondaryWriteBuffer != null)
{
- _selectorThread.recycleBuffer(_secondaryWriteBuffer);
+ _pool.recycleBuffer(_secondaryWriteBuffer);
_secondaryWriteBuffer = null;
}
}
if (_readBuffer != null)
{
- _selectorThread.recycleBuffer(_readBuffer);
+ _pool.recycleBuffer(_readBuffer);
_readBuffer = null;
}
+
+ if(_tempBuffer != null)
+ {
+ _pool.recycleNativeBuffer(_tempBuffer);
+ _tempBuffer = null;
+ }
}
+
+ AsynchronousSocketChannel getSocket()
+ {
+ return _socket;
+ }
}
Index: src/org/mmocore/network/ConnectionAcceptor.java
===================================================================
--- src/org/mmocore/network/ConnectionAcceptor.java (revision 0)
+++ src/org/mmocore/network/ConnectionAcceptor.java (working copy)
@@ -0,0 +1,33 @@
+package org.mmocore.network;
+
+/**
+ * @author BiggBoss
+ * @param <T>
+ */
+public final class ConnectionAcceptor<T extends MMOClient<?>> implements Runnable
+{
+ private final Core<T> core;
+ private final ConnectionManager<T> manager;
+
+ ConnectionAcceptor(final Core<T> coreRef, final ConnectionManager<T> man)
+ {
+ core = coreRef;
+ manager = man;
+ }
+
+ @Override
+ public void run()
+ {
+ while(!core.isShutDown())
+ {
+ try
+ {
+ manager.tryAcceptConnection();
+ }
+ catch(Exception e)
+ {
+
+ }
+ }
+ }
+}
Index: src/org/mmocore/network/Core.java
===================================================================
--- src/org/mmocore/network/Core.java (revision 0)
+++ src/org/mmocore/network/Core.java (working copy)
@@ -0,0 +1,204 @@
+/* This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2, or (at your option)
+ * any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA
+ * 02111-1307, USA.
+ *
+ * http://www.gnu.org/copyleft/gpl.html
+ */
+package org.mmocore.network;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.nio.channels.AsynchronousChannelGroup;
+import java.nio.channels.AsynchronousServerSocketChannel;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * @author KenM
+ * @author BiggBoss
+ * @param <T>
+ */
+public final class Core<T extends MMOClient<?>>
+{
+ // default BYTE_ORDER
+ private static final ByteOrder BYTE_ORDER = ByteOrder.LITTLE_ENDIAN;
+ // default HEADER_SIZE
+ private static final int HEADER_SIZE = 2;
+ // Server Socket
+ private AsynchronousServerSocketChannel _server;
+
+ private ConnectionManager<T> _conManager;
+ private final ReadHandler<T> _readHandler;
+ private final WriteHandler<T> _writeHandler;
+ private final ConnectionCloseHandler<T> _conCloserHandler;
+ // Implementations
+ private final IPacketHandler<T> _packetHandler;
+ private final IMMOExecutor<T> _executor;
+ private final IClientFactory<T> _clientFactory;
+ private final IAcceptFilter _acceptFilter;
+ // Configurations
+ private final int THREADPOOL_SIZE;
+
+ // Buffer pool
+ private BufferPool _bufPool;
+ // Connected clients
+ //private IteratingBlockingQueue<MMOConnection<T>> _connectionList;
+ // Read/writer executor service
+ private ThreadPoolExecutor _parseExecutor;
+
+ private boolean _shutdown;
+
+ public Core(final CoreConfig sc, final IMMOExecutor<T> executor, final IPacketHandler<T> packetHandler, final IClientFactory<T> clientFactory, final IAcceptFilter acceptFilter)
+ {
+ THREADPOOL_SIZE = sc.ASYNC_THREAD_POOL_SIZE;
+
+ ReadParseTask.HEADER_SIZE = HEADER_SIZE;
+ ReadParseTask.MAX_READ_PER_PASS = sc.MAX_READ_PER_PASS;
+
+ WriteParseTask.HEADER_SIZE = HEADER_SIZE;
+ WriteParseTask.MAX_SEND_PER_PASS = sc.MAX_SEND_PER_PASS;
+
+ //_connectionList = new IteratingBlockingQueue<MMOConnection<T>>();
+
+ _bufPool = new BufferPool(BYTE_ORDER, sc.NATIVE_BUF_POOL_SIZE, sc.NATIVE_BUF_SIZE, sc.HELPER_BUFFER_COUNT, sc.HELPER_BUFFER_SIZE, sc.STRING_BUF_POOL_SIZE, sc.STRING_BUF_SIZE);
+
+ _acceptFilter = acceptFilter;
+ _packetHandler = packetHandler;
+ _clientFactory = clientFactory;
+ _executor = executor;
+
+ _parseExecutor = new ThreadPoolExecutor(sc.WORKERS_THREAD_POOL_SIZE, sc.WORKERS_THREAD_POOL_SIZE, 3000L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
+
+ _readHandler = new ReadHandler<>(this);
+ _writeHandler = new WriteHandler<>(this);
+ _conCloserHandler = new ConnectionCloseHandler<>(this);
+ }
+
+ public final void openServerSocket(InetAddress address, int tcpPort) throws IOException
+ {
+ ThreadPoolExecutor executorService = new ThreadPoolExecutor(THREADPOOL_SIZE, THREADPOOL_SIZE, 3000L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
+ AsynchronousChannelGroup group = AsynchronousChannelGroup.withThreadPool(executorService);
+ _server = AsynchronousServerSocketChannel.open(group);
+
+ if (address == null)
+ _server.bind(new InetSocketAddress(tcpPort));
+ else
+ _server.bind(new InetSocketAddress(address, tcpPort));
+
+ _conManager = new ConnectionManager<>(_server, this);
+ _conManager.init();
+ }
+
+ ReceivablePacket<T> handlePacket(ByteBuffer buf, T client)
+ {
+ return _packetHandler.handlePacket(buf, client);
+ }
+
+ void executePacket(ReceivablePacket<T> packet)
+ {
+ _executor.execute(packet);
+ }
+
+ void executeReadTask(ReadParseTask<T> task)
+ {
+ _parseExecutor.execute(task);
+ }
+
+ void executeWriteTask(WriteParseTask<T> task)
+ {
+ _parseExecutor.execute(task);
+ }
+
+ BufferPool getBufferPool()
+ {
+ return _bufPool;
+ }
+
+ public boolean isShutDown()
+ {
+ return _shutdown;
+ }
+
+ IClientFactory<T> getClientFactory()
+ {
+ return _clientFactory;
+ }
+
+ IAcceptFilter getAcceptFilter()
+ {
+ return _acceptFilter;
+ }
+
+ ReadHandler<T> getReadCompletionHandler()
+ {
+ return _readHandler;
+ }
+
+ WriteHandler<T> getWriteCompletionHandler()
+ {
+ return _writeHandler;
+ }
+
+ ConnectionCloseHandler<T> getCloserCompletionHandler()
+ {
+ return _conCloserHandler;
+ }
+
+ final void finishConnection(final MMOConnection<T> con)
+ {
+ try
+ {
+ con.getSocket().close();
+ }
+ catch (IOException e)
+ {
+ con.getClient().onForcedDisconnection();
+ closeConnectionImpl(con);
+ }
+ }
+
+ final void closeConnectionImpl(final MMOConnection<T> con)
+ {
+ try
+ {
+ // notify connection
+ con.getClient().onDisconnection();
+ }
+ finally
+ {
+ try
+ {
+ // close socket and the SocketChannel
+ con.close();
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace();
+ }
+ finally
+ {
+ con.releaseBuffers();
+ }
+ }
+ }
+
+ public final void shutdown()
+ {
+ _shutdown = true;
+ }
+}
Index: src/org/mmocore/network/IAcceptFilter.java
===================================================================
--- src/org/mmocore/network/IAcceptFilter.java (revision 6601)
+++ src/org/mmocore/network/IAcceptFilter.java (working copy)
@@ -17,12 +17,13 @@
*/
package org.mmocore.network;
-import java.nio.channels.SocketChannel;
+import java.nio.channels.AsynchronousSocketChannel;
/**
* @author KenM
+ *
*/
public interface IAcceptFilter
{
- public boolean accept(SocketChannel sc);
+ public boolean accept(AsynchronousSocketChannel sc);
}
@CantoneseTeaAndMilk
Copy link

not very stable , CPU almost 90%

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment