Skip to content

Instantly share code, notes, and snippets.

@eduardoaw
Forked from ochinchina/NettyConnection.java
Created October 23, 2018 13:44
Show Gist options
  • Save eduardoaw/577c9c43d439e735f1ecff29704d23c2 to your computer and use it in GitHub Desktop.
Save eduardoaw/577c9c43d439e735f1ecff29704d23c2 to your computer and use it in GitHub Desktop.
netty client reconnect if the connection is lost or the server is not started
package com.nokia.nls.nmif;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.Timer;
import java.util.TimerTask;
public class NettyConnection {
private Bootstrap bootstrap = new Bootstrap();
private SocketAddress addr_;
private Channel channel_;
private Timer timer_;
public NettyConnection( String host, int port, Timer timer ) {
this( new InetSocketAddress( host, port ), timer );
}
public NettyConnection( SocketAddress addr, Timer timer ) {
this.addr_ = addr;
this.timer_ = timer;
bootstrap.group( new NioEventLoopGroup() );
bootstrap.channel(NioSocketChannel.class);
bootstrap.option(ChannelOption.SO_KEEPALIVE, true);
bootstrap.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast( createNMMessageHandler() );
}
});
scheduleConnect( 10 );
}
public void send(String msg) throws IOException {
if( channel_ != null && channel_.isActive() ) {
ByteBuf buf = channel_.alloc().buffer().writeBytes( msg.getBytes() );
channel_.writeAndFlush( buf );
} else {
throw new IOException( "Can't send message to inactive connection");
}
}
public void close() {
try {
channel_.close().sync();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
private void doConnect() {
try {
ChannelFuture f = bootstrap.connect( addr_ );
f.addListener( new ChannelFutureListener() {
@Override public void operationComplete(ChannelFuture future) throws Exception {
if( !future.isSuccess() ) {//if is not successful, reconnect
future.channel().close();
bootstrap.connect( addr_ ).addListener(this);
} else {//good, the connection is ok
channel_ = future.channel();
//add a listener to detect the connection lost
addCloseDetectListener( channel_ );
connectionEstablished();
}
}
private void addCloseDetectListener(Channel channel) {
//if the channel connection is lost, the ChannelFutureListener.operationComplete() will be called
channel.closeFuture().addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future )
throws Exception {
connectionLost();
scheduleConnect( 5 );
}
});
}
});
}catch( Exception ex ) {
scheduleConnect( 1000 );
}
}
private void scheduleConnect( long millis ) {
timer_.schedule( new TimerTask() {
@Override
public void run() {
doConnect();
}
}, millis );
}
private ChannelHandler createNMMessageHandler() {
return new ChannelInboundHandlerAdapter () {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ByteBuf buf = (ByteBuf)msg;
int n = buf.readableBytes();
if( n > 0 ) {
byte[] b = new byte[n];
buf.readBytes(b);
handleMessage( new String( b ) );
}
}
};
}
public void handleMessage(String msg) {
System.out.println( msg );
}
public void connectionLost() {
System.out.println("connectionLost()" );
}
public void connectionEstablished() {
try {
send( "hello");
} catch (IOException e) {
e.printStackTrace();
}
}
public static void main( String...args ) {
NettyConnection conn = new NettyConnection( "127.0.0.1", 34567, new Timer() );
for( ; ; ) {
try { Thread.sleep( 100 ); } catch( Exception ex ) {}
}
}
}
import java.io.IOException;
import java.net.ServerSocket;
import java.net.Socket;
public class Server {
private ServerSocket serverSock;
public Server(int port ) throws IOException {
serverSock = new ServerSocket( port );
for( ; ; ) {
Socket sock = serverSock.accept();
System.out.println( "socket accepted:" + sock );
try {
byte[] buf = new byte[5];
int n = sock.getInputStream().read( buf );
System.out.println( new String( buf, 0, n ));
sock.getOutputStream().write( "world".getBytes() );
sock.getOutputStream().flush();
Thread.sleep( 1000 );
sock.close();
}catch( Exception ex ) {
}
}
}
public static void main( String ... args ) {
try {
new Server( 34567 );
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment