Created
April 29, 2010 12:37
-
-
Save kzk/383532 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.io.*; | |
import java.net.*; | |
import java.util.*; | |
import java.util.concurrent.*; | |
import java.lang.reflect.*; | |
import org.apache.mina.common.*; | |
import org.apache.mina.transport.socket.nio.*; | |
import org.apache.mina.filter.codec.*; | |
import org.apache.mina.filter.executor.*; | |
import org.apache.mina.util.SessionLog; | |
import org.msgpack.*; | |
//----------------------------------------------------------------------------- | |
// User | |
// | |
class HelloHandler { | |
public int hello0() { | |
System.out.println("hello0 handler!"); | |
return 0; | |
} | |
public int hello1(int a) { | |
System.out.println("hello1 handler! a = " + a); | |
return 1; | |
} | |
public int hello2(int a, int b) { | |
System.out.println("hello2 handler! a = " + a + ", b = " + b); | |
return 2; | |
} | |
} | |
//----------------------------------------------------------------------------- | |
// System | |
// | |
class RPCRequestDecoder extends CumulativeProtocolDecoder { | |
protected Unpacker unpacker; | |
public RPCRequestDecoder() { | |
super(); | |
this.unpacker = new Unpacker(); | |
} | |
protected boolean doDecode(IoSession session, ByteBuffer in, ProtocolDecoderOutput out) throws Exception { | |
unpacker.feed(in.buf()); | |
if (unpacker.execute()) { | |
Object data = unpacker.getData(); | |
unpacker.reset(); | |
if (data instanceof AbstractList) { | |
out.write(data); | |
return true; | |
} | |
throw new IOException("invalid MPRPC"); // TODO | |
} | |
return false; | |
} | |
} | |
class RPCResponseEncoder extends ProtocolEncoderAdapter { | |
public void encode(IoSession session, Object message, ProtocolEncoderOutput out) throws Exception { | |
ByteArrayOutputStream o = new ByteArrayOutputStream(); | |
new Packer(o).pack(message); | |
out.write(ByteBuffer.wrap(o.toByteArray())); | |
} | |
} | |
class MessagePackRPCCodecFactory implements ProtocolCodecFactory { | |
public MessagePackRPCCodecFactory() { | |
} | |
public ProtocolDecoder getDecoder() throws Exception { | |
return new RPCRequestDecoder(); | |
} | |
public ProtocolEncoder getEncoder() throws Exception { | |
return new RPCResponseEncoder(); | |
} | |
} | |
class MessagePackRPCIoHandler extends IoHandlerAdapter { | |
public static final int TYPE_REQUEST = 0; | |
public static final int TYPE_RESPONSE = 1; | |
public static final String INDEX_KEY = MessagePackRPCIoHandler.class.getName() + ".INDEX"; | |
protected Object handler = null; | |
public MessagePackRPCIoHandler(Object handler) { | |
super(); | |
this.handler = handler; | |
} | |
public void sessionOpened(IoSession session) throws Exception { | |
session.setAttribute(INDEX_KEY, new Integer(0)); | |
} | |
public void exceptionCaught(IoSession session, Throwable cause) throws Exception { | |
SessionLog.warn(session, cause.getMessage(), cause); | |
} | |
public void messageReceived(IoSession session, Object message) throws Exception { | |
AbstractList a = (AbstractList)message; | |
if (a.size() != 4) | |
throw new IOException("Invalid MPRPC"); // TODO | |
Object type = a.get(0); | |
Object msgid = a.get(1); | |
Object method = a.get(2); | |
Object params = a.get(3); | |
if (((Number)type).intValue() != TYPE_REQUEST) | |
throw new IOException("Invalid MPRPC"); // TODO | |
if (!(method instanceof byte[])) | |
throw new IOException("Invalid method"); // TODO | |
/* | |
System.out.println("type = " + a.get(0).getClass()); | |
System.out.println("type = " + a.get(1).getClass()); | |
System.out.println("type = " + a.get(2).getClass()); | |
System.out.println("type = " + a.get(3).getClass()); | |
*/ | |
AbstractList ps = new ArrayList(); | |
if (params instanceof AbstractList) | |
ps = ((AbstractList)params); | |
Object ret = callMethod(handler, new String((byte[])method), ps); | |
ArrayList<Object> response = new ArrayList<Object>(); | |
response.add(TYPE_RESPONSE); | |
response.add(msgid); | |
response.add(null); | |
response.add(ret); | |
session.write(response); | |
} | |
protected Object callMethod(Object handler, String method, AbstractList params) throws Exception { | |
Method m = findMethod(handler, method, params); | |
if (m == null) throw new IOException("No such method"); | |
return m.invoke(handler, params.toArray()); | |
} | |
protected Method findMethod(Object handler, String method, AbstractList params) { | |
int nParams = params.size(); | |
Method[] ms = handler.getClass().getMethods(); | |
for (int i = 0; i < ms.length; i++) { | |
Method m = ms[i]; | |
if (!method.equals(m.getName())) continue; | |
if (nParams != m.getParameterTypes().length) continue; | |
return m; | |
} | |
return null; | |
} | |
} | |
public class MinaServer { | |
public static void main(String[] args) throws IOException { | |
int PORT = 1985; | |
MessagePackRPCIoHandler handler = new MessagePackRPCIoHandler(new HelloHandler()); | |
SocketAcceptor acceptor = new SocketAcceptor(Runtime.getRuntime().availableProcessors() + 1, | |
Executors.newCachedThreadPool()); | |
SocketAcceptorConfig acceptorConfig = acceptor.getDefaultConfig(); | |
acceptorConfig.setThreadModel(ThreadModel.MANUAL); | |
DefaultIoFilterChainBuilder filterChainBuilder = acceptor.getDefaultConfig().getFilterChain(); | |
filterChainBuilder.addLast("protocol", new ProtocolCodecFilter(new MessagePackRPCCodecFactory())); | |
filterChainBuilder.addLast("threadPool", new ExecutorFilter(Executors.newCachedThreadPool())); | |
acceptor.bind(new InetSocketAddress(PORT), handler); | |
System.out.println("server is listenig at port " + PORT); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment