Created
September 22, 2018 07:31
-
-
Save cgyy/4d607599ab6e590531f489236afa7a27 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.ctfo.v2x.v2xcm.gateway.manager; | |
import com.ctfo.v2x.v2xcm.gateway.help.DSUtils; | |
import java.io.IOException; | |
import java.net.InetSocketAddress; | |
import java.nio.ByteBuffer; | |
import java.nio.channels.SelectionKey; | |
import java.nio.channels.Selector; | |
import java.nio.channels.SocketChannel; | |
import java.util.ArrayList; | |
import java.util.Date; | |
import java.util.Iterator; | |
import java.util.List; | |
public class NioClient implements Runnable { | |
private String host; | |
private int port; | |
// The host:port combination to connect to | |
// The selector we'll be monitoring | |
private Selector selector; | |
private SocketChannel socketChannel; | |
// The buffer into which we'll read data when it's available | |
private ByteBuffer readBuffer = ByteBuffer.allocate(8192); | |
// Maps a SocketChannel to a list of ByteBuffer instances | |
private List pendingData = new ArrayList(); | |
public NioClient(String host, int port) throws IOException { | |
this.host = host; | |
this.port = port; | |
this.selector = Selector.open(); | |
this.socketChannel = this.initiateConnection(); | |
} | |
public void send(byte[] data) throws IOException { | |
puts("send"); | |
//设为可写 | |
this.socketChannel.register(this.selector, SelectionKey.OP_WRITE); | |
// And queue the data we want written | |
synchronized (this.pendingData) { | |
this.pendingData.add(ByteBuffer.wrap(data)); | |
} | |
// Finally, wake up our selecting thread so it can make the required changes | |
this.selector.wakeup(); | |
} | |
public void run() { | |
while (true) { | |
try { | |
// Wait for an event one of the registered channels | |
this.selector.select(); | |
// Iterate over the set of keys for which events are available | |
Iterator selectedKeys = this.selector.selectedKeys().iterator(); | |
while (selectedKeys.hasNext()) { | |
SelectionKey key = (SelectionKey) selectedKeys.next(); | |
selectedKeys.remove(); | |
if (!key.isValid()) { | |
continue; | |
} | |
// Check what event is available and deal with it | |
if (key.isConnectable()) { | |
this.finishConnection(key); | |
} else if (key.isReadable()) { | |
this.read(key); | |
} else if (key.isWritable()) { | |
this.write(key); | |
} | |
} | |
} catch (Exception e) { | |
e.printStackTrace(); | |
} | |
} | |
} | |
private void read(SelectionKey key) throws IOException { | |
// puts("read"); | |
SocketChannel socketChannel = (SocketChannel) key.channel(); | |
// Clear out our read buffer so it's ready for new data | |
// this.readBuffer.clear(); | |
// Attempt to read off the channel | |
int numRead; | |
try { | |
numRead = socketChannel.read(this.readBuffer); | |
} catch (IOException e) { | |
// The remote forcibly closed the connection, cancel | |
// the selection key and close the channel. | |
e.printStackTrace(); | |
key.cancel(); | |
socketChannel.close(); | |
return; | |
} | |
if (numRead == -1) { | |
// Remote entity shut the socket down cleanly. Do the | |
// same from our end and cancel the channel. | |
key.channel().close(); | |
key.cancel(); | |
return; | |
} | |
// Handle the response | |
// this.handleResponse(socketChannel, this.readBuffer.array(), numRead); | |
this.handleResponse(socketChannel, numRead); | |
} | |
private void handleResponse(SocketChannel socketChannel, int numRead) throws IOException { | |
final int headSize = 78; | |
this.readBuffer.flip(); | |
while (true) { | |
if (this.readBuffer.remaining() < headSize) { | |
break; | |
} | |
this.readBuffer.mark(); | |
byte[] head = new byte[78]; | |
this.readBuffer.get(head); | |
// System.out.println("head:" + head); | |
int len = DSUtils.getIntFromBytes(head, head.length - 4); | |
// System.out.println("payload length:" + len + "remaining" + this.readBuffer.remaining()); | |
if (this.readBuffer.remaining() < len) { | |
this.readBuffer.reset(); | |
break; | |
} | |
byte[] content = new byte[len]; | |
this.readBuffer.get(content); | |
// Tuple2 tuple = new Tuple2<>(head, content); | |
System.out.println("get tuple" +": " + new Date()); | |
} | |
System.out.println("remaining" + this.readBuffer.remaining()); | |
this.readBuffer.compact(); | |
} | |
private void handleResponse(SocketChannel socketChannel, byte[] data, int numRead) throws IOException { | |
this.readBuffer.clear(); | |
System.out.println("handleResponse111111" + new Date()); | |
// Make a correctly sized copy of the data before handing it | |
// to the client | |
byte[] rspData = new byte[numRead]; | |
System.arraycopy(data, 0, rspData, 0, numRead); | |
puts("read length" + rspData.length); | |
// Look up the handler for this channel | |
// RspHandler handler = (RspHandler) this.rspHandlers.get(socketChannel); | |
// And pass the response to it | |
// if (handler.handleResponse(rspData)) { | |
// // The handler has seen enough, close the connection | |
// socketChannel.close(); | |
// socketChannel.keyFor(this.selector).cancel(); | |
// } | |
} | |
private void write(SelectionKey key) throws IOException { | |
puts("write"); | |
SocketChannel socketChannel = (SocketChannel) key.channel(); | |
synchronized (this.pendingData) { | |
// Write until there's not more data ... | |
while (!this.pendingData.isEmpty()) { | |
ByteBuffer buf = (ByteBuffer) this.pendingData.get(0); | |
socketChannel.write(buf); | |
if (buf.remaining() > 0) { | |
// ... or the socket's buffer fills up | |
break; | |
} | |
this.pendingData.remove(0); | |
} | |
if (this.pendingData.isEmpty()) { | |
// We wrote away all data, so we're no longer interested | |
// in writing on this socket. Switch back to waiting for | |
// data. | |
key.interestOps(SelectionKey.OP_READ); | |
} | |
} | |
} | |
private void finishConnection(SelectionKey key) throws IOException { | |
puts("finishConnection"); | |
SocketChannel socketChannel = (SocketChannel) key.channel(); | |
// Finish the connection. If the connection operation failed | |
// this will raise an IOException. | |
try { | |
socketChannel.finishConnect(); | |
} catch (IOException e) { | |
// Cancel the channel's registration with our selector | |
System.out.println(e); | |
key.cancel(); | |
return; | |
} | |
// Register an interest in writing on this channel | |
// key.interestOps(SelectionKey.OP_WRITE); | |
key.interestOps(SelectionKey.OP_READ); | |
} | |
private SocketChannel initiateConnection() throws IOException { | |
// Create a non-blocking socket channel | |
SocketChannel socketChannel = SocketChannel.open(); | |
socketChannel.configureBlocking(false); | |
// Kick off connection establishment | |
socketChannel.connect(new InetSocketAddress(this.host, this.port)); | |
socketChannel.register(this.selector, SelectionKey.OP_CONNECT); | |
// socketChannel.register(this.selector, SelectionKey.OP_CONNECT |SelectionKey.OP_WRITE | SelectionKey.OP_READ); | |
// Queue a channel registration since the caller is not the | |
// selecting thread. As part of the registration we'll register | |
// an interest in connection events. These are raised when a channel | |
// is ready to complete connection establishment. | |
// synchronized (this.pendingChanges) { | |
// this.pendingChanges.add(new ChangeRequest(socketChannel, ChangeRequest.REGISTER, SelectionKey.OP_CONNECT)); | |
// } | |
return socketChannel; | |
} | |
private void puts(String s) { | |
System.out.println(s); | |
} | |
public static void main(String[] args) { | |
try { | |
NioClient client = new NioClient("localhost", 3000); | |
Thread t = new Thread(client); | |
// t.setDaemon(true); | |
t.start(); | |
Thread.sleep(10_000); | |
client.send(new byte[78]); | |
Thread.sleep(10_000); | |
client.send(new byte[78]); | |
} catch (Exception e) { | |
e.printStackTrace(); | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment