Skip to content

Instantly share code, notes, and snippets.

@ochinchina
Last active September 14, 2020 16:30
Show Gist options
  • Save ochinchina/00b507d815e3ff7bc428 to your computer and use it in GitHub Desktop.
Save ochinchina/00b507d815e3ff7bc428 to your computer and use it in GitHub Desktop.
AsynchronousSocketChannel wrapper to support completely asyn operations
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
import java.util.LinkedList;
import java.util.concurrent.TimeUnit;
/**
* This class wraps the AsynchronousSocketChannel reading and writing operations. The wrapped reading and writing operations is
* completely asynchronous operation. No any exception will be thrown from the read/write operation. If any exception occurs
* during read & write, the failed() method in CompletionHandler will be called.
*
* @author Steven Ou
*
*/
public class AsynchronousSocketChannelWrapper {
private AsynchronousSocketChannel channel_;
private LinkedList< Operation > readOperations_ = new LinkedList< Operation >();
private LinkedList< Operation > writeOperations_ = new LinkedList< Operation >();
private interface Operation {
void execute();
}
private class ProxyCompletionHandler<V,A> implements CompletionHandler<V,A> {
private CompletionHandler<V,? super A> handler_;
private LinkedList< Operation > ops_;
ProxyCompletionHandler( CompletionHandler<V,? super A> handler, LinkedList< Operation > ops ) {
this.handler_ = handler;
this.ops_ = ops;
}
@Override
public void completed(V result, A attachment) {
try {
handler_.completed(result, attachment);
}catch( Throwable ex ) {
}
executeNextOperation( ops_ );
}
@Override
public void failed(Throwable exc, A attachment) {
try {
handler_.failed(exc, attachment);
}catch( Throwable ex ) {
}
executeNextOperation( ops_ );
}
}
private class FirstReadOperation<A> implements Operation {
private ByteBuffer[] dsts;
private int offset;
private int length;
private long timeout;
private TimeUnit unit;
private A attachment;
private CompletionHandler<Long,? super A> handler;
FirstReadOperation( ByteBuffer[] dsts,
int offset,
int length,
long timeout,
TimeUnit unit,
A attachment,
CompletionHandler<Long,? super A> handler ) {
this.dsts = dsts;
this.offset = offset;
this.length = length;
this.timeout = timeout;
this.unit = unit;
this.attachment = attachment;
this.handler = new ProxyCompletionHandler<Long, A>( handler, readOperations_ );
}
@Override
public void execute() {
try {
channel_.read( dsts, offset, length, timeout, unit, attachment, handler );
}catch( Throwable ex ) {
this.handler.failed( ex, attachment );
}
}
}
private class SecondReadOperation<A> implements Operation {
private ByteBuffer dst;
private A attachment;
private CompletionHandler<Integer,? super A> handler;
SecondReadOperation(ByteBuffer dst,
A attachment,
CompletionHandler<Integer,? super A> handler) {
this.dst = dst;
this.attachment = attachment;
this.handler = new ProxyCompletionHandler<Integer, A>( handler, readOperations_ );
}
@Override
public void execute() {
try {
channel_.read( dst, attachment, handler );
}catch( Throwable ex ) {
handler.failed(ex, attachment);
}
}
}
private class ThirdReadOperation<A> implements Operation {
private ByteBuffer dst;
private long timeout;
private TimeUnit unit;
private A attachment;
private CompletionHandler<Integer,? super A> handler;
ThirdReadOperation( ByteBuffer dst,
long timeout,
TimeUnit unit,
A attachment,
CompletionHandler<Integer,? super A> handler ) {
this.dst = dst;
this.timeout = timeout;
this.unit = unit;
this.attachment = attachment;
this.handler = new ProxyCompletionHandler<Integer, A>( handler, readOperations_ );
}
@Override
public void execute() {
try {
channel_.read( dst, timeout, unit, attachment, handler );
}catch( Throwable ex ) {
handler.failed(ex, attachment);
}
}
}
private class FirstWriteOperation<A> implements Operation {
private ByteBuffer[] srcs;
private int offset;
private int length;
private long timeout;
private TimeUnit unit;
private A attachment;
private CompletionHandler<Long,? super A> handler;
public FirstWriteOperation( ByteBuffer[] srcs,
int offset,
int length,
long timeout,
TimeUnit unit,
A attachment,
CompletionHandler<Long,? super A> handler ) {
this.srcs = srcs;
this.offset = offset;
this.length = length;
this.timeout = timeout;
this.unit = unit;
this.attachment = attachment;
this.handler = new ProxyCompletionHandler<Long, A>( handler, writeOperations_ );
}
@Override
public void execute() {
try {
channel_.write( srcs, offset, length, timeout, unit, attachment, handler );
}catch( Throwable ex ) {
handler.failed(ex, attachment);
}
}
}
private class SecondWriteOperation<A> implements Operation {
private ByteBuffer src;
private A attachment;
private CompletionHandler<Integer,? super A> handler;
SecondWriteOperation( ByteBuffer src,
A attachment,
CompletionHandler<Integer,? super A> handler ) {
this.src = src;
this.attachment = attachment;
this.handler = new ProxyCompletionHandler<Integer, A>( handler, writeOperations_ );
}
@Override
public void execute() {
try {
channel_.write( src, attachment, handler );
}catch( Throwable ex ) {
handler.failed(ex, attachment);
}
}
}
private class ThirdWriteOperation<A> implements Operation {
private ByteBuffer src;
private long timeout;
private TimeUnit unit;
private A attachment;
private CompletionHandler<Integer,? super A> handler;
ThirdWriteOperation( ByteBuffer src,
long timeout,
TimeUnit unit,
A attachment,
CompletionHandler<Integer,? super A> handler ) {
this.src = src;
this.timeout = timeout;
this.unit = unit;
this.attachment = attachment;
this.handler = new ProxyCompletionHandler<Integer, A>( handler, writeOperations_ );
}
@Override
public void execute() {
try {
channel_.write( src, timeout, unit, attachment, handler );
}catch( Throwable ex ) {
handler.failed(ex, attachment);
}
}
}
public AsynchronousSocketChannelWrapper( AsynchronousSocketChannel channel ) {
this.channel_ = channel;
}
public AsynchronousSocketChannel getChannel() {
return this.channel_;
}
/**
* read data from underline socket channel. No any exception will be thrown by this method. If any exception occurs, the
* handler.failed() method will be called.
*
* @param dsts see AsynchronousSocketChannel.read() operation
* @param offset see AsynchronousSocketChannel.read() operation
* @param length see AsynchronousSocketChannel.read() operation
* @param timeout see AsynchronousSocketChannel.read() operation
* @param unit see AsynchronousSocketChannel.read() operation
* @param attachment see AsynchronousSocketChannel.read() operation
* @param handler see AsynchronousSocketChannel.read() operation
*
* @throws no exception thrown. If any exeption occurs during reading operation, the handler.failed() method will be called.
*/
public <A> void read(ByteBuffer[] dsts,
int offset,
int length,
long timeout,
TimeUnit unit,
A attachment,
CompletionHandler<Long,? super A> handler) {
addReadOperation( new FirstReadOperation<A>( dsts, offset, length, timeout, unit, attachment, handler ) );
}
/**
*
* @param dst see AsynchronousSocketChannel.read() operation
* @param attachment see AsynchronousSocketChannel.read() operation
* @param handler see AsynchronousSocketChannel.read() operation
*
* @see #read(ByteBuffer[], int, int, long, TimeUnit, Object, CompletionHandler)
*/
public <A> void read(ByteBuffer dst,
A attachment,
CompletionHandler<Integer,? super A> handler) {
addReadOperation( new SecondReadOperation<A>( dst, attachment, handler ) );
}
/**
*
* @param dst see AsynchronousSocketChannel.read() operation
* @param timeout see AsynchronousSocketChannel.read() operation
* @param unit see AsynchronousSocketChannel.read() operation
* @param attachment see AsynchronousSocketChannel.read() operation
* @param handler see AsynchronousSocketChannel.read() operation
*
* @see #read(ByteBuffer[], int, int, long, TimeUnit, Object, CompletionHandler)
*/
public <A> void read(ByteBuffer dst,
long timeout,
TimeUnit unit,
A attachment,
CompletionHandler<Integer,? super A> handler) {
addReadOperation( new ThirdReadOperation<A>( dst, timeout, unit, attachment, handler ) );
}
/**
* write data to peer, no any exception will be thrown by this write operation. If any exception got during writing operation, the
* handler.failed() will be called.
*
* @param srcs see AsynchronousSocketChannel.write() operation
* @param offset see AsynchronousSocketChannel.write() operation
* @param length see AsynchronousSocketChannel.write() operation
* @param timeout see AsynchronousSocketChannel.write() operation
* @param unit see AsynchronousSocketChannel.write() operation
* @param attachment see AsynchronousSocketChannel.write() operation
* @param handler see AsynchronousSocketChannel.write() operation
*
* @throws no exception will be thrown by this write operation. handler.failed() will be called if get any exception during writing.
*/
public <A> void write(ByteBuffer[] srcs,
int offset,
int length,
long timeout,
TimeUnit unit,
A attachment,
CompletionHandler<Long,? super A> handler) {
addWriteOperation( new FirstWriteOperation<A>( srcs, offset, length, timeout, unit, attachment, handler ));
}
/**
*
* @param src see AsynchronousSocketChannel.write() operation
* @param attachment see AsynchronousSocketChannel.write() operation
* @param handler see AsynchronousSocketChannel.write() operation
*
* @see #write(ByteBuffer[], int, int, long, TimeUnit, Object, CompletionHandler)
*/
public <A> void write(ByteBuffer src,
A attachment,
CompletionHandler<Integer,? super A> handler) {
addWriteOperation( new SecondWriteOperation<A>( src, attachment, handler ));
}
/**
*
* @param src see AsynchronousSocketChannel.write() operation
* @param timeout see AsynchronousSocketChannel.write() operation
* @param unit see AsynchronousSocketChannel.write() operation
* @param attachment see AsynchronousSocketChannel.write() operation
* @param handler see AsynchronousSocketChannel.write() operation
*
* @see #write(ByteBuffer[], int, int, long, TimeUnit, Object, CompletionHandler)
*/
public <A> void write(ByteBuffer src,
long timeout,
TimeUnit unit,
A attachment,
CompletionHandler<Integer,? super A> handler) {
addWriteOperation( new ThirdWriteOperation<A>( src, timeout, unit, attachment, handler ) );
}
private void addReadOperation( Operation operation ) {
addOperation( operation, readOperations_ );
}
private void addWriteOperation( Operation operation ) {
addOperation( operation, writeOperations_ );
}
private static void addOperation( Operation operation, LinkedList< Operation > ops ) {
Operation executeOperation = null;
synchronized( ops ) {
ops.add( operation );
if( ops.size() == 1 ) {
executeOperation = ops.getFirst();
}
}
if( executeOperation != null ) {
executeOperation.execute();
}
}
private static void executeNextOperation( LinkedList< Operation > ops ) {
Operation executeOperation = null;
synchronized( ops ) {
ops.removeFirst();
if( !ops.isEmpty() ) {
executeOperation = ops.getFirst();
}
}
if( executeOperation != null ) {
executeOperation.execute();
}
}
}
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
public class EchoClient {
private AsynchronousSocketChannelWrapper sockChannel_;
public EchoClient( String server, int port ) throws IOException {
AsynchronousSocketChannel channel = AsynchronousSocketChannel.open();
channel.connect( new InetSocketAddress( server, port ), channel, new CompletionHandler<Void, AsynchronousSocketChannel >(){
@Override
public void completed(Void result, AsynchronousSocketChannel channel ) {
sockChannel_ = new AsynchronousSocketChannelWrapper( channel );
startRead();
sayHello();
}
@Override
public void failed(Throwable ex, AsynchronousSocketChannel channel) {
ex.printStackTrace();
}
});
}
private void startRead() {
ByteBuffer buf = ByteBuffer.allocate(1024);
sockChannel_.read( buf, buf, new CompletionHandler<Integer, ByteBuffer >() {
@Override
public void completed(Integer result, ByteBuffer buf) {
System.out.println( "received " + result + " bytes");
buf.flip();
byte[] b = new byte[result];
buf.get( b );
try {
System.out.println( new String( b, "UTF-8"));
} catch (UnsupportedEncodingException e1) {
e1.printStackTrace();
}
try {
sockChannel_.getChannel().close();
} catch (IOException e) {
e.printStackTrace();
}
System.exit( 0 );
}
@Override
public void failed(Throwable exc, ByteBuffer attachment) {
System.exit( -1 );
}
});
}
private void sayHello() {
try {
sockChannel_.write( ByteBuffer.wrap( "hello".getBytes("UTF-8") ), null, new CompletionHandler<Integer, Void>() {
@Override
public void completed(Integer result, Void attachment) {
System.out.println( "write " + result + " bytes");
}
@Override
public void failed(Throwable exc, Void attachment) {
}
});
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
}
public static void main( String...args ) {
try {
EchoClient client = new EchoClient( "127.0.0.1", 3355 );
for( ; ; ) {
Thread.sleep( 1000 );
}
}catch( Exception ex ) {
ex.printStackTrace();
}
}
}
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousServerSocketChannel;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
public class EchoServer {
private AsynchronousServerSocketChannel serverChannel_;
public EchoServer( int port ) throws IOException {
serverChannel_ = AsynchronousServerSocketChannel.open().bind( new InetSocketAddress( port ));
startAccept();
}
private void startAccept() {
serverChannel_.accept(null, new CompletionHandler<AsynchronousSocketChannel,Void>() {
@Override
public void completed(AsynchronousSocketChannel sockChannel, Void arg) {
new EchoService( sockChannel );
startAccept();
}
@Override
public void failed(Throwable ex, Void arg) {
}
});
}
private class EchoService {
private AsynchronousSocketChannelWrapper sockChannel_;
public EchoService( AsynchronousSocketChannel sockChannel ) {
this.sockChannel_ = new AsynchronousSocketChannelWrapper( sockChannel );
startRead();
}
private void startRead() {
ByteBuffer buf = ByteBuffer.allocate( 1024 );
this.sockChannel_.read( buf, buf, createReadCompletionHandler(buf) );
}
private CompletionHandler<Integer, ByteBuffer> createReadCompletionHandler( ByteBuffer buf ) {
return new CompletionHandler<Integer, ByteBuffer>() {
@Override
public void completed(Integer result, ByteBuffer buf ) {
System.out.println( "received " + result + " bytes");
buf.flip();
sockChannel_.write( buf, buf, createWriteCompletionHandler( buf ) );
}
@Override
public void failed(Throwable ex, ByteBuffer buf) {
ex.printStackTrace();
}
};
}
private CompletionHandler<Integer, ByteBuffer> createWriteCompletionHandler( ByteBuffer buf ) {
return new CompletionHandler<Integer, ByteBuffer>() {
@Override
public void completed(Integer result, ByteBuffer attachment) {
System.out.println( "write " + result + " bytes");
}
@Override
public void failed(Throwable exc, ByteBuffer attachment) {
}
};
}
}
public static void main( String...args ) {
try {
new EchoServer( 3355 );
for( ; ; ) {
Thread.sleep( 1000 );
}
}catch( Exception ex ) {
ex.printStackTrace();
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment