Skip to content

Instantly share code, notes, and snippets.

@wangzaixiang
Last active November 26, 2017 11:51
Show Gist options
  • Save wangzaixiang/352f9dbc58b90135d3415e2b4a387fce to your computer and use it in GitHub Desktop.
Save wangzaixiang/352f9dbc58b90135d3415e2b4a387fce to your computer and use it in GitHub Desktop.
这是一个很好的selector例子,也发现了一个bug,当连接一次后, selector会进入不等待死循环。原因是 accept后注册了 OP_CONNECT错误,但这个导致错误返回是否应该算JDK的一个bug呢?
package com.demo;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.util.Iterator;
/**
* Created by ever on 2017/11/24.
*/
public class SelectorDemo {
private static final int BUF_SIZE = 1024;
private static final int PORT = 8070;
private static final int TIMEOUT = 3000;
public static void main(String[] args) throws IOException, InterruptedException {
selector();
}
/**
* TODO
* 多线程提高处理能力
* @throws IOException
* @throws InterruptedException
*/
private static void selector() throws IOException, InterruptedException {
ServerSocketChannel ssc = ServerSocketChannel.open();
Selector selector = Selector.open();
ssc.socket().bind(new InetSocketAddress(PORT));
ssc.configureBlocking(false);
ssc.register(selector, SelectionKey.OP_ACCEPT);
while (true) {
if (selector.select(TIMEOUT) == 0) {
System.out.println("===");
continue;
}
Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
while (iter.hasNext()) {
SelectionKey key = iter.next();
if (key.isAcceptable()) {
handleAccept(key);
}
if (key.isConnectable()) {
handleConnect(key);
}
if (key.isWritable() && key.isValid()) {
handleWrite(key);
}
if (key.isReadable()) {
handleRead(key);
}
iter.remove();
}
}
}
private static void handleAccept(SelectionKey key) throws IOException {
System.out.println("Handle accept event");
ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
SocketChannel sc = ssc.accept();
sc.configureBlocking(false);
//CONNECT事件会造成客户端(telnet)连接上来后,select()方法行为怪异
sc.register(key.selector(), SelectionKey.OP_CONNECT);//OP_READ
}
/**
* 模拟大数据量异步写入
* @param key
* @throws IOException
* @throws InterruptedException
*/
private static void handleWrite(SelectionKey key) throws IOException, InterruptedException {
System.out.println("Handle write event");
SocketChannel sc = (SocketChannel) key.channel();
ByteBuffer buffer = (ByteBuffer) key.attachment();
int writes = sc.write(buffer);
System.out.println("write bytes:" + writes);
System.out.println("remain:" + buffer.remaining());
if (buffer.remaining() == 0) {
key.interestOps(SelectionKey.OP_READ);
}
}
/**
* 服务端从来不会进入。。。
*
* @param key
* @throws ClosedChannelException
*/
public static void handleConnect(SelectionKey key) throws ClosedChannelException {
System.out.println("Handle connect event");
System.out.println(key.interestOps());
key.interestOps(SelectionKey.OP_READ);
System.out.println(key.interestOps());
}
public static void handleRead(SelectionKey key) throws IOException {
System.out.println("Handle read event");
SocketChannel sc = (SocketChannel) key.channel();
ByteBuffer buffer = ByteBuffer.allocate(BUF_SIZE);
long bytesRead = sc.read(buffer);
System.out.println("read:" + bytesRead);
if (bytesRead == -1) {
System.out.println("Peer closed");
sc.close();
System.exit(0);
}
if (bytesRead > 0) {
buffer.flip();
while (buffer.hasRemaining()) {
System.out.print((char) buffer.get());
}
}
ByteBuffer writeBuffer = ByteBuffer.allocate(BUF_SIZE*BUF_SIZE);
for (int i = 0; i < BUF_SIZE*BUF_SIZE; i++) {
writeBuffer.put((byte)i);
}
writeBuffer.flip();
key.attach(writeBuffer);
// won't have any effect this time, and will take effect after next select() invoked
key.interestOps(SelectionKey.OP_WRITE|SelectionKey.OP_READ);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment