Created
April 29, 2010 17:16
-
-
Save kzk/383908 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.io.*; | |
import java.net.*; | |
import java.util.*; | |
import java.util.concurrent.*; | |
import java.lang.reflect.*; | |
import java.nio.ByteBuffer; | |
import org.jboss.netty.bootstrap.*; | |
import org.jboss.netty.channel.*; | |
import org.jboss.netty.channel.socket.nio.*; | |
import org.jboss.netty.buffer.*; | |
import org.jboss.netty.handler.execution.*; | |
import org.jboss.netty.handler.codec.frame.*; | |
import org.jboss.netty.handler.codec.oneone.*; | |
import org.msgpack.*; | |
class HelloHandler { | |
public int hello0() { | |
return 0; | |
} | |
public int hello1(int a) { | |
return 1; | |
} | |
public int hello2(int a, int b) { | |
return 2; | |
} | |
} | |
@ChannelPipelineCoverage("all") | |
class RPCRequestDecoder extends FrameDecoder { | |
protected Unpacker unpacker; | |
public RPCRequestDecoder() { | |
super(); | |
this.unpacker = new Unpacker(); | |
} | |
@Override | |
protected Object decode(ChannelHandlerContext ctx, Channel channel, ChannelBuffer buffer) throws Exception { | |
int len = buffer.readableBytes(); | |
if (len == 0) return null; | |
unpacker.reserveBuffer(len); | |
byte[] unpacker_buf = unpacker.getBuffer(); | |
buffer.readBytes(unpacker_buf, unpacker.getBufferOffset(), len); | |
unpacker.bufferConsumed(len); | |
if (unpacker.execute()) { | |
Object data = unpacker.getData(); | |
unpacker.reset(); | |
if (data instanceof AbstractList) | |
return data; | |
throw new IOException("invalid MPRPC" + data); // TODO | |
} | |
return null; | |
} | |
} | |
@ChannelPipelineCoverage("all") | |
class RPCResponseEncoder extends OneToOneEncoder { | |
@Override | |
protected Object encode(ChannelHandlerContext ctx, Channel channel, Object msg) throws Exception { | |
ByteArrayOutputStream o = new ByteArrayOutputStream(); | |
new Packer(o).pack(msg); | |
byte[] b = o.toByteArray(); | |
ChannelBuffer buf = ChannelBuffers.dynamicBuffer(); | |
buf.writeBytes(b); | |
return buf; | |
} | |
} | |
@ChannelPipelineCoverage("all") | |
class MessagePackRPCHandler extends SimpleChannelHandler { | |
public static final int TYPE_REQUEST = 0; | |
public static final int TYPE_RESPONSE = 1; | |
protected Object handler = null; | |
protected Method[] handlerMethods = null; | |
public MessagePackRPCHandler(Object handler) { | |
super(); | |
this.handler = handler; | |
this.handlerMethods = handler.getClass().getMethods(); | |
} | |
@Override | |
public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) { | |
e.getCause().printStackTrace(); | |
Channel ch = e.getChannel(); | |
ch.close(); | |
} | |
@Override | |
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception { | |
AbstractList a = (AbstractList)e.getMessage(); | |
if (a.size() != 4) | |
throw new IOException("Invalid MPRPC"); // TODO | |
Object type = a.get(0); | |
Object msgid = a.get(1); | |
Object method = a.get(2); | |
Object params = a.get(3); | |
if (((Number)type).intValue() != TYPE_REQUEST) | |
throw new IOException("Invalid MPRPC"); // TODO | |
if (!(method instanceof byte[])) | |
throw new IOException("Invalid method"); // TODO | |
Object handlerResult = null; | |
String errorMessage = null; | |
try { | |
AbstractList paramList = (params instanceof AbstractList) ? (AbstractList)params : new ArrayList(); | |
handlerResult = callMethod(handler, new String((byte[])method), paramList); | |
} catch (Exception rpc_e) { | |
errorMessage = rpc_e.getMessage(); | |
} | |
ArrayList<Object> response = new ArrayList<Object>(); | |
response.add(TYPE_RESPONSE); | |
response.add(msgid); | |
response.add(errorMessage); | |
response.add(handlerResult); | |
e.getChannel().write(response); | |
} | |
protected Object callMethod(Object handler, String method, AbstractList params) throws Exception { | |
Method m = findMethod(handler, method, params); | |
if (m == null) throw new IOException("No such method"); | |
return m.invoke(handler, params.toArray()); | |
} | |
protected Method findMethod(Object handler, String method, AbstractList params) { | |
int nParams = params.size(); | |
Method[] ms = handlerMethods; | |
for (int i = 0; i < ms.length; i++) { | |
Method m = ms[i]; | |
if (!method.equals(m.getName())) continue; | |
if (nParams != m.getParameterTypes().length) continue; | |
return m; | |
} | |
return null; | |
} | |
} | |
class MessagePackRPCPipelineFactory implements ChannelPipelineFactory { | |
private final MessagePackRPCHandler handler; | |
private final RPCResponseEncoder encoder; | |
public MessagePackRPCPipelineFactory() { | |
this.handler = new MessagePackRPCHandler(new HelloHandler()); | |
this.encoder = new RPCResponseEncoder(); | |
} | |
public ChannelPipeline getPipeline() throws Exception { | |
MemoryAwareThreadPoolExecutor eventExecutor = | |
new MemoryAwareThreadPoolExecutor(5, 1000000, 10000000, 100, | |
TimeUnit.MILLISECONDS); | |
ChannelPipeline pipeline = Channels.pipeline(); | |
pipeline.addLast("decoder", new RPCRequestDecoder()); | |
pipeline.addLast("encoder", encoder); | |
//pipeline.addLast("executor", new ExecutionHandler(eventExecutor)); | |
pipeline.addLast("handler", handler); | |
return pipeline; | |
} | |
} | |
public class NettyServer { | |
public static void main(String[] args) throws IOException { | |
int PORT = 1985; | |
ChannelFactory factory = new NioServerSocketChannelFactory(Executors.newCachedThreadPool(), | |
Executors.newCachedThreadPool()); | |
ServerBootstrap bootstrap = new ServerBootstrap(factory); | |
bootstrap.setPipelineFactory(new MessagePackRPCPipelineFactory()); | |
bootstrap.setOption("reuseAddress", true); | |
bootstrap.setOption("child.tcpNoDelay", true); | |
bootstrap.setOption("child.keepAlive", true); | |
bootstrap.bind(new InetSocketAddress(PORT)); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment