Skip to content

Instantly share code, notes, and snippets.

@vlaadbrain
Created September 28, 2012 22:00
Show Gist options
  • Save vlaadbrain/3802297 to your computer and use it in GitHub Desktop.
Save vlaadbrain/3802297 to your computer and use it in GitHub Desktop.
Netty Example Server
import java.net.InetSocketAddress;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.jboss.netty.bootstrap.ServerBootstrap;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelFactory;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelPipelineFactory;
import org.jboss.netty.channel.ChannelStateEvent;
import org.jboss.netty.channel.Channels;
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.channel.SimpleChannelHandler;
import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
import org.jboss.netty.handler.codec.frame.DelimiterBasedFrameDecoder;
import org.jboss.netty.handler.codec.frame.Delimiters;
public class NettyExampleServer extends SimpleChannelHandler {
AtomicLong KEY = new AtomicLong(0L);
private static ScheduledThreadPoolExecutor TIME_SERVICE = new ScheduledThreadPoolExecutor(2);
private static ScheduledThreadPoolExecutor QUOTE_SERVICE = new ScheduledThreadPoolExecutor(5);
private static SimpleDateFormat SDF = new SimpleDateFormat("yyyyMMdd HH:mm:ss");
private static void startSendingTimeToChannel(final Channel channel) {
TIME_SERVICE.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
channel.write("T," + SDF.format(new Date()) + "\n");
}
},0,1, TimeUnit.SECONDS);
}
private static void startPublishing(final Channel channel, final String SYMBOL) {
QUOTE_SERVICE.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
channel.write("Q,"+ SYMBOL +",7,147.2700,0.68,0.004638788,105199549,568,148.1100,146.8300,147.2600,147.2700,23200,8900\n");
}
}, 23, 1, TimeUnit.MILLISECONDS);
}
@Override
public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) {
Channel ch = e.getChannel();
ch.write("S,KEY," + KEY.incrementAndGet() + "\n");
ch.write("S,SERVER CONNECTED\n");
ch.write("S,IP,1.0.0.127 5009\n");
ch.write("S,CUST,real_time,1.0.0.127,5009,FOOX\n");
startSendingTimeToChannel(ch);
}
@Override
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) {
ChannelBuffer buf = ((ChannelBuffer) e.getMessage()).copy();
String message = new String(buf.array());
if (message.startsWith("w")) {
startPublishing(ctx.getChannel(), message.substring(1)); // assume the rest is a SYMBOL!
}
}
public static void main(String [] args) throws Exception {
ChannelFactory factory =
new NioServerSocketChannelFactory(
Executors.newCachedThreadPool(),
Executors.newCachedThreadPool());
ServerBootstrap bootstrap = new ServerBootstrap(factory);
bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
public ChannelPipeline getPipeline() {
return Channels.pipeline(
new DelimiterBasedFrameDecoder(12,Delimiters.lineDelimiter()), //CTS-spec only allows 11 bytes per symbol at most, i do +1 :)
new NettyExampleServer());
}
});
bootstrap.setOption("child.tcpNoDelay", true);
bootstrap.setOption("child.keepAlive", true);
bootstrap.bind(new InetSocketAddress(5009));
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment