Skip to content

Instantly share code, notes, and snippets.

@arganzheng
Created November 22, 2013 06:51
Show Gist options
  • Save arganzheng/7595894 to your computer and use it in GitHub Desktop.
Save arganzheng/7595894 to your computer and use it in GitHub Desktop.
Java NIO study Selector Channel select
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.SelectableChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Set;
public class JavaNioStudy {
/**
* 使用一个线程对多个InetSocketAddress进行链接和读写处理。
*
* @param socketAddresses
* @throws IOException
*/
public void select(InetSocketAddress... socketAddresses) throws IOException {
// 打开选择器
Selector selector = Selector.open();
// 初使化连接
if (initConnect(selector, socketAddresses) <= 0) {
return;
}
Set<SocketChannel> socketChannels = new HashSet<SocketChannel>();
// 轮询注册事件
while (true) {
// 选择一组键,其相应的通道已为 I/O 操作准备就绪。
// 监控所有注册的 channel ,当其中有注册的 IO 操作可以进行时,该函数返回,并将对应的 SelectionKey加入
int readyChannels = selector.select(500);
if (readyChannels == 0) continue;
// selector的select返回结果在selector内部状态中(selectedKeys中)。需要轮询检查哪些是ready的。
// return: The number of keys, possibly zero, whose ready-operation sets were updated.
Set<SelectionKey> selectedKeys = selector.selectedKeys();
Iterator<SelectionKey> keyIterator = selectedKeys.iterator();
while (keyIterator.hasNext()) {
SelectionKey selectionKey = keyIterator.next();
if (selectionKey.isAcceptable()) {
// a connection was accepted by a ServerSocketChannel.
// 我们这里是client,不需要处理这个事件。
} else if (selectionKey.isConnectable()) {
// a connection was established with a remote server.
if (doConnect(selector, selectionKey)) {
// success connected to server
socketChannels.add((SocketChannel) selectionKey.channel());
}
} else if (selectionKey.isReadable()) {// 读回包
// a channel is ready for reading
if (doRead(selector, selectionKey)) {
// finish reading, close the channel
SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
socketChannels.remove(socketChannel);
socketChannel.close();
}
} else if (selectionKey.isWritable()) {
// a channel is ready for writing
doWrite(selector, selectionKey);
}
// once the key is handled, it needs to be removed
keyIterator.remove();
}
if (socketChannels.size() <= 0) {
break;
}
}
// 10、关闭连接
selector.close();
}
private void doWrite(Selector selector, SelectionKey selectionKey) {
SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
// selectionKey.attachment();
// TODO write to the channel
try {
socketChannel.register(selector, SelectionKey.OP_READ);
} catch (ClosedChannelException e) {
e.printStackTrace();
}
}
private boolean doRead(Selector selector, SelectionKey selectionKey) {
SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
// selectionKey.attachment();
// TODO read from the channel
return true;
}
/**
* <pre>
* 监听多个链接。
*
* 关键:一个SocketChannel对应一个InetSocketAddress,一个Selector可以监听多个SocketChannel的注册的事件。
* </pre>
*/
public int initConnect(Selector selector, InetSocketAddress... addresses) {
int size = 0;
for (InetSocketAddress address : addresses) {
SocketChannel channel;
try {
// 打开socket通道
channel = SocketChannel.open();
// 设置为非阻塞方式
registerChannel(selector, channel, SelectionKey.OP_CONNECT);
// 连接
channel.connect(address);
} catch (IOException e) {
e.printStackTrace();
continue;
}
size++;
}
return size;
}
public void registerChannel(Selector selector, SelectableChannel channel, int ops) throws IOException {
if (channel == null) {
return;
}
// 设置为非阻塞方式
channel.configureBlocking(false);
// 注册连接服务端socket动作
channel.register(selector, ops);
}
/**
* @Description: 处理connect事件
* @param selector select
* @param selectionKey 事件key
* @param socketChannels : 总得连接
* @return boolean 返回类型
* @throws
*/
private boolean doConnect(Selector selector, SelectionKey selectionKey) {
SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
// 判断此通道上是否正在进行连接操作。
// 完成套接字通道的连接过程。
if (socketChannel.isConnectionPending()) {
// 完成连接的建立(TCP三次握手)
try {
socketChannel.finishConnect();
} catch (IOException e) {
e.printStackTrace();
return false;
}
}
try {
socketChannel.register(selector, SelectionKey.OP_WRITE, selectionKey.attachment());
} catch (ClosedChannelException e) {
e.printStackTrace();
return false;
}
return true;
}
}
@arganzheng
Copy link
Author

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment