Skip to content

Instantly share code, notes, and snippets.

@kzk
Created April 29, 2010 12:37
Show Gist options
  • Save kzk/383532 to your computer and use it in GitHub Desktop.
Save kzk/383532 to your computer and use it in GitHub Desktop.
import java.io.*;
import java.net.*;
import java.util.*;
import java.util.concurrent.*;
import java.lang.reflect.*;
import org.apache.mina.common.*;
import org.apache.mina.transport.socket.nio.*;
import org.apache.mina.filter.codec.*;
import org.apache.mina.filter.executor.*;
import org.apache.mina.util.SessionLog;
import org.msgpack.*;
//-----------------------------------------------------------------------------
// User
//
class HelloHandler {
public int hello0() {
System.out.println("hello0 handler!");
return 0;
}
public int hello1(int a) {
System.out.println("hello1 handler! a = " + a);
return 1;
}
public int hello2(int a, int b) {
System.out.println("hello2 handler! a = " + a + ", b = " + b);
return 2;
}
}
//-----------------------------------------------------------------------------
// System
//
class RPCRequestDecoder extends CumulativeProtocolDecoder {
protected Unpacker unpacker;
public RPCRequestDecoder() {
super();
this.unpacker = new Unpacker();
}
protected boolean doDecode(IoSession session, ByteBuffer in, ProtocolDecoderOutput out) throws Exception {
unpacker.feed(in.buf());
if (unpacker.execute()) {
Object data = unpacker.getData();
unpacker.reset();
if (data instanceof AbstractList) {
out.write(data);
return true;
}
throw new IOException("invalid MPRPC"); // TODO
}
return false;
}
}
class RPCResponseEncoder extends ProtocolEncoderAdapter {
public void encode(IoSession session, Object message, ProtocolEncoderOutput out) throws Exception {
ByteArrayOutputStream o = new ByteArrayOutputStream();
new Packer(o).pack(message);
out.write(ByteBuffer.wrap(o.toByteArray()));
}
}
class MessagePackRPCCodecFactory implements ProtocolCodecFactory {
public MessagePackRPCCodecFactory() {
}
public ProtocolDecoder getDecoder() throws Exception {
return new RPCRequestDecoder();
}
public ProtocolEncoder getEncoder() throws Exception {
return new RPCResponseEncoder();
}
}
class MessagePackRPCIoHandler extends IoHandlerAdapter {
public static final int TYPE_REQUEST = 0;
public static final int TYPE_RESPONSE = 1;
public static final String INDEX_KEY = MessagePackRPCIoHandler.class.getName() + ".INDEX";
protected Object handler = null;
public MessagePackRPCIoHandler(Object handler) {
super();
this.handler = handler;
}
public void sessionOpened(IoSession session) throws Exception {
session.setAttribute(INDEX_KEY, new Integer(0));
}
public void exceptionCaught(IoSession session, Throwable cause) throws Exception {
SessionLog.warn(session, cause.getMessage(), cause);
}
public void messageReceived(IoSession session, Object message) throws Exception {
AbstractList a = (AbstractList)message;
if (a.size() != 4)
throw new IOException("Invalid MPRPC"); // TODO
Object type = a.get(0);
Object msgid = a.get(1);
Object method = a.get(2);
Object params = a.get(3);
if (((Number)type).intValue() != TYPE_REQUEST)
throw new IOException("Invalid MPRPC"); // TODO
if (!(method instanceof byte[]))
throw new IOException("Invalid method"); // TODO
/*
System.out.println("type = " + a.get(0).getClass());
System.out.println("type = " + a.get(1).getClass());
System.out.println("type = " + a.get(2).getClass());
System.out.println("type = " + a.get(3).getClass());
*/
AbstractList ps = new ArrayList();
if (params instanceof AbstractList)
ps = ((AbstractList)params);
Object ret = callMethod(handler, new String((byte[])method), ps);
ArrayList<Object> response = new ArrayList<Object>();
response.add(TYPE_RESPONSE);
response.add(msgid);
response.add(null);
response.add(ret);
session.write(response);
}
protected Object callMethod(Object handler, String method, AbstractList params) throws Exception {
Method m = findMethod(handler, method, params);
if (m == null) throw new IOException("No such method");
return m.invoke(handler, params.toArray());
}
protected Method findMethod(Object handler, String method, AbstractList params) {
int nParams = params.size();
Method[] ms = handler.getClass().getMethods();
for (int i = 0; i < ms.length; i++) {
Method m = ms[i];
if (!method.equals(m.getName())) continue;
if (nParams != m.getParameterTypes().length) continue;
return m;
}
return null;
}
}
public class MinaServer {
public static void main(String[] args) throws IOException {
int PORT = 1985;
MessagePackRPCIoHandler handler = new MessagePackRPCIoHandler(new HelloHandler());
SocketAcceptor acceptor = new SocketAcceptor(Runtime.getRuntime().availableProcessors() + 1,
Executors.newCachedThreadPool());
SocketAcceptorConfig acceptorConfig = acceptor.getDefaultConfig();
acceptorConfig.setThreadModel(ThreadModel.MANUAL);
DefaultIoFilterChainBuilder filterChainBuilder = acceptor.getDefaultConfig().getFilterChain();
filterChainBuilder.addLast("protocol", new ProtocolCodecFilter(new MessagePackRPCCodecFactory()));
filterChainBuilder.addLast("threadPool", new ExecutorFilter(Executors.newCachedThreadPool()));
acceptor.bind(new InetSocketAddress(PORT), handler);
System.out.println("server is listenig at port " + PORT);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment