Skip to content

Instantly share code, notes, and snippets.

@cgyy
Created September 22, 2018 07:31
Show Gist options
  • Save cgyy/4d607599ab6e590531f489236afa7a27 to your computer and use it in GitHub Desktop.
Save cgyy/4d607599ab6e590531f489236afa7a27 to your computer and use it in GitHub Desktop.
package com.ctfo.v2x.v2xcm.gateway.manager;
import com.ctfo.v2x.v2xcm.gateway.help.DSUtils;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.ArrayList;
import java.util.Date;
import java.util.Iterator;
import java.util.List;
public class NioClient implements Runnable {
private String host;
private int port;
// The host:port combination to connect to
// The selector we'll be monitoring
private Selector selector;
private SocketChannel socketChannel;
// The buffer into which we'll read data when it's available
private ByteBuffer readBuffer = ByteBuffer.allocate(8192);
// Maps a SocketChannel to a list of ByteBuffer instances
private List pendingData = new ArrayList();
public NioClient(String host, int port) throws IOException {
this.host = host;
this.port = port;
this.selector = Selector.open();
this.socketChannel = this.initiateConnection();
}
public void send(byte[] data) throws IOException {
puts("send");
//设为可写
this.socketChannel.register(this.selector, SelectionKey.OP_WRITE);
// And queue the data we want written
synchronized (this.pendingData) {
this.pendingData.add(ByteBuffer.wrap(data));
}
// Finally, wake up our selecting thread so it can make the required changes
this.selector.wakeup();
}
public void run() {
while (true) {
try {
// Wait for an event one of the registered channels
this.selector.select();
// Iterate over the set of keys for which events are available
Iterator selectedKeys = this.selector.selectedKeys().iterator();
while (selectedKeys.hasNext()) {
SelectionKey key = (SelectionKey) selectedKeys.next();
selectedKeys.remove();
if (!key.isValid()) {
continue;
}
// Check what event is available and deal with it
if (key.isConnectable()) {
this.finishConnection(key);
} else if (key.isReadable()) {
this.read(key);
} else if (key.isWritable()) {
this.write(key);
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
private void read(SelectionKey key) throws IOException {
// puts("read");
SocketChannel socketChannel = (SocketChannel) key.channel();
// Clear out our read buffer so it's ready for new data
// this.readBuffer.clear();
// Attempt to read off the channel
int numRead;
try {
numRead = socketChannel.read(this.readBuffer);
} catch (IOException e) {
// The remote forcibly closed the connection, cancel
// the selection key and close the channel.
e.printStackTrace();
key.cancel();
socketChannel.close();
return;
}
if (numRead == -1) {
// Remote entity shut the socket down cleanly. Do the
// same from our end and cancel the channel.
key.channel().close();
key.cancel();
return;
}
// Handle the response
// this.handleResponse(socketChannel, this.readBuffer.array(), numRead);
this.handleResponse(socketChannel, numRead);
}
private void handleResponse(SocketChannel socketChannel, int numRead) throws IOException {
final int headSize = 78;
this.readBuffer.flip();
while (true) {
if (this.readBuffer.remaining() < headSize) {
break;
}
this.readBuffer.mark();
byte[] head = new byte[78];
this.readBuffer.get(head);
// System.out.println("head:" + head);
int len = DSUtils.getIntFromBytes(head, head.length - 4);
// System.out.println("payload length:" + len + "remaining" + this.readBuffer.remaining());
if (this.readBuffer.remaining() < len) {
this.readBuffer.reset();
break;
}
byte[] content = new byte[len];
this.readBuffer.get(content);
// Tuple2 tuple = new Tuple2<>(head, content);
System.out.println("get tuple" +": " + new Date());
}
System.out.println("remaining" + this.readBuffer.remaining());
this.readBuffer.compact();
}
private void handleResponse(SocketChannel socketChannel, byte[] data, int numRead) throws IOException {
this.readBuffer.clear();
System.out.println("handleResponse111111" + new Date());
// Make a correctly sized copy of the data before handing it
// to the client
byte[] rspData = new byte[numRead];
System.arraycopy(data, 0, rspData, 0, numRead);
puts("read length" + rspData.length);
// Look up the handler for this channel
// RspHandler handler = (RspHandler) this.rspHandlers.get(socketChannel);
// And pass the response to it
// if (handler.handleResponse(rspData)) {
// // The handler has seen enough, close the connection
// socketChannel.close();
// socketChannel.keyFor(this.selector).cancel();
// }
}
private void write(SelectionKey key) throws IOException {
puts("write");
SocketChannel socketChannel = (SocketChannel) key.channel();
synchronized (this.pendingData) {
// Write until there's not more data ...
while (!this.pendingData.isEmpty()) {
ByteBuffer buf = (ByteBuffer) this.pendingData.get(0);
socketChannel.write(buf);
if (buf.remaining() > 0) {
// ... or the socket's buffer fills up
break;
}
this.pendingData.remove(0);
}
if (this.pendingData.isEmpty()) {
// We wrote away all data, so we're no longer interested
// in writing on this socket. Switch back to waiting for
// data.
key.interestOps(SelectionKey.OP_READ);
}
}
}
private void finishConnection(SelectionKey key) throws IOException {
puts("finishConnection");
SocketChannel socketChannel = (SocketChannel) key.channel();
// Finish the connection. If the connection operation failed
// this will raise an IOException.
try {
socketChannel.finishConnect();
} catch (IOException e) {
// Cancel the channel's registration with our selector
System.out.println(e);
key.cancel();
return;
}
// Register an interest in writing on this channel
// key.interestOps(SelectionKey.OP_WRITE);
key.interestOps(SelectionKey.OP_READ);
}
private SocketChannel initiateConnection() throws IOException {
// Create a non-blocking socket channel
SocketChannel socketChannel = SocketChannel.open();
socketChannel.configureBlocking(false);
// Kick off connection establishment
socketChannel.connect(new InetSocketAddress(this.host, this.port));
socketChannel.register(this.selector, SelectionKey.OP_CONNECT);
// socketChannel.register(this.selector, SelectionKey.OP_CONNECT |SelectionKey.OP_WRITE | SelectionKey.OP_READ);
// Queue a channel registration since the caller is not the
// selecting thread. As part of the registration we'll register
// an interest in connection events. These are raised when a channel
// is ready to complete connection establishment.
// synchronized (this.pendingChanges) {
// this.pendingChanges.add(new ChangeRequest(socketChannel, ChangeRequest.REGISTER, SelectionKey.OP_CONNECT));
// }
return socketChannel;
}
private void puts(String s) {
System.out.println(s);
}
public static void main(String[] args) {
try {
NioClient client = new NioClient("localhost", 3000);
Thread t = new Thread(client);
// t.setDaemon(true);
t.start();
Thread.sleep(10_000);
client.send(new byte[78]);
Thread.sleep(10_000);
client.send(new byte[78]);
} catch (Exception e) {
e.printStackTrace();
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment