Skip to content

Instantly share code, notes, and snippets.

@kzk
Created April 29, 2010 17:16
Show Gist options
  • Save kzk/383908 to your computer and use it in GitHub Desktop.
Save kzk/383908 to your computer and use it in GitHub Desktop.
import java.io.*;
import java.net.*;
import java.util.*;
import java.util.concurrent.*;
import java.lang.reflect.*;
import java.nio.ByteBuffer;
import org.jboss.netty.bootstrap.*;
import org.jboss.netty.channel.*;
import org.jboss.netty.channel.socket.nio.*;
import org.jboss.netty.buffer.*;
import org.jboss.netty.handler.execution.*;
import org.jboss.netty.handler.codec.frame.*;
import org.jboss.netty.handler.codec.oneone.*;
import org.msgpack.*;
class HelloHandler {
public int hello0() {
return 0;
}
public int hello1(int a) {
return 1;
}
public int hello2(int a, int b) {
return 2;
}
}
@ChannelPipelineCoverage("all")
class RPCRequestDecoder extends FrameDecoder {
protected Unpacker unpacker;
public RPCRequestDecoder() {
super();
this.unpacker = new Unpacker();
}
@Override
protected Object decode(ChannelHandlerContext ctx, Channel channel, ChannelBuffer buffer) throws Exception {
int len = buffer.readableBytes();
if (len == 0) return null;
unpacker.reserveBuffer(len);
byte[] unpacker_buf = unpacker.getBuffer();
buffer.readBytes(unpacker_buf, unpacker.getBufferOffset(), len);
unpacker.bufferConsumed(len);
if (unpacker.execute()) {
Object data = unpacker.getData();
unpacker.reset();
if (data instanceof AbstractList)
return data;
throw new IOException("invalid MPRPC" + data); // TODO
}
return null;
}
}
@ChannelPipelineCoverage("all")
class RPCResponseEncoder extends OneToOneEncoder {
@Override
protected Object encode(ChannelHandlerContext ctx, Channel channel, Object msg) throws Exception {
ByteArrayOutputStream o = new ByteArrayOutputStream();
new Packer(o).pack(msg);
byte[] b = o.toByteArray();
ChannelBuffer buf = ChannelBuffers.dynamicBuffer();
buf.writeBytes(b);
return buf;
}
}
@ChannelPipelineCoverage("all")
class MessagePackRPCHandler extends SimpleChannelHandler {
public static final int TYPE_REQUEST = 0;
public static final int TYPE_RESPONSE = 1;
protected Object handler = null;
protected Method[] handlerMethods = null;
public MessagePackRPCHandler(Object handler) {
super();
this.handler = handler;
this.handlerMethods = handler.getClass().getMethods();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) {
e.getCause().printStackTrace();
Channel ch = e.getChannel();
ch.close();
}
@Override
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
AbstractList a = (AbstractList)e.getMessage();
if (a.size() != 4)
throw new IOException("Invalid MPRPC"); // TODO
Object type = a.get(0);
Object msgid = a.get(1);
Object method = a.get(2);
Object params = a.get(3);
if (((Number)type).intValue() != TYPE_REQUEST)
throw new IOException("Invalid MPRPC"); // TODO
if (!(method instanceof byte[]))
throw new IOException("Invalid method"); // TODO
Object handlerResult = null;
String errorMessage = null;
try {
AbstractList paramList = (params instanceof AbstractList) ? (AbstractList)params : new ArrayList();
handlerResult = callMethod(handler, new String((byte[])method), paramList);
} catch (Exception rpc_e) {
errorMessage = rpc_e.getMessage();
}
ArrayList<Object> response = new ArrayList<Object>();
response.add(TYPE_RESPONSE);
response.add(msgid);
response.add(errorMessage);
response.add(handlerResult);
e.getChannel().write(response);
}
protected Object callMethod(Object handler, String method, AbstractList params) throws Exception {
Method m = findMethod(handler, method, params);
if (m == null) throw new IOException("No such method");
return m.invoke(handler, params.toArray());
}
protected Method findMethod(Object handler, String method, AbstractList params) {
int nParams = params.size();
Method[] ms = handlerMethods;
for (int i = 0; i < ms.length; i++) {
Method m = ms[i];
if (!method.equals(m.getName())) continue;
if (nParams != m.getParameterTypes().length) continue;
return m;
}
return null;
}
}
class MessagePackRPCPipelineFactory implements ChannelPipelineFactory {
private final MessagePackRPCHandler handler;
private final RPCResponseEncoder encoder;
public MessagePackRPCPipelineFactory() {
this.handler = new MessagePackRPCHandler(new HelloHandler());
this.encoder = new RPCResponseEncoder();
}
public ChannelPipeline getPipeline() throws Exception {
MemoryAwareThreadPoolExecutor eventExecutor =
new MemoryAwareThreadPoolExecutor(5, 1000000, 10000000, 100,
TimeUnit.MILLISECONDS);
ChannelPipeline pipeline = Channels.pipeline();
pipeline.addLast("decoder", new RPCRequestDecoder());
pipeline.addLast("encoder", encoder);
//pipeline.addLast("executor", new ExecutionHandler(eventExecutor));
pipeline.addLast("handler", handler);
return pipeline;
}
}
public class NettyServer {
public static void main(String[] args) throws IOException {
int PORT = 1985;
ChannelFactory factory = new NioServerSocketChannelFactory(Executors.newCachedThreadPool(),
Executors.newCachedThreadPool());
ServerBootstrap bootstrap = new ServerBootstrap(factory);
bootstrap.setPipelineFactory(new MessagePackRPCPipelineFactory());
bootstrap.setOption("reuseAddress", true);
bootstrap.setOption("child.tcpNoDelay", true);
bootstrap.setOption("child.keepAlive", true);
bootstrap.bind(new InetSocketAddress(PORT));
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment