Skip to content

Instantly share code, notes, and snippets.

@gMan1990
Last active May 8, 2019 11:50
Show Gist options
  • Save gMan1990/1b9c5a65a78e61c3ebbf93ce6c585390 to your computer and use it in GitHub Desktop.
Save gMan1990/1b9c5a65a78e61c3ebbf93ce6c585390 to your computer and use it in GitHub Desktop.
Socket, Channel, Selector, Asynchronous, 是否阻塞, 同步异步

[TOC]

免责

maybe not the right code/summary

UNIX Network Programming, I/O Models

https://www.zhihu.com/question/19732473/answer/26101328

总结

  • 前提
    • 以“发一条收一条/一问一答”这样的方式来方便理解
    • 代码(非现实)也按此方式编写,对照分析
  • Socket: 阻塞
    • read是阻塞的,比如你向妹子发了条微信你一定要妹子回复你之后你才能再向妹子回复(当然你可以设定个超时时间setSoTimeout,超时之后你再发)
    • read: 要么收到消息要么出错
    • 想提高服务器会话并发能力开个线程池,一个线程对应一个会话,没有旧会话结束新会话就得等;客户端当前线程在得到妹子消息前无法执行其它后续代码(参见“同步/异步”)
  • Channel: 非阻塞(可阻塞)
    • read是非阻塞的,比如你向妹子发了条微信,妹子一直没回复你(循环read),你可以看心情1分钟后再发一条,发完再过3分钟再发一条
    • read: 要么收到消息要么没收到消息要么出错
    • 同Socket;同Socket
  • Selector: 多路复用
    • 比如你向妹子发了条微信,事件驱动(select)
    if (isReadable) {// 妹子回复信息了
        // 回妹子信息
        if (isWritable) {
            // 聊其它的 as you wish
        }
    } else if (isWritable) {// 妹子一直没回复
        // 你可以看心情1分钟后再发一条,发完再过3分钟再发一条
    }
    • 同Channel
    • 此处线程对应可用会话,比如妹子同时在和3个🐶子聊天,有2个来消息了,妹子只用把这2个拿到线程池里去执行;同Channel
  • Signal driven: 母鸡
  • Asynchronous
    • 通知机制
    • 同Socket
    • 同Selector;异步

同步/异步

比如针对取得妹子的消息这点有阻塞的read,非阻塞循环read,阻塞select(/非阻塞循环select)这3个,它们所在线程的后续其它代码都无法继续执行,始终得阻塞/循环直到得到妹子回复。

Socket: 阻塞

@Slf4j
public class SocketServer {

    /** 例子: 发一条收一条 */
    public static void main(String[] args) {
        boolean _accept = true;
        ExecutorService fixedThreadPool = Executors.newFixedThreadPool(2);
        try (ServerSocket server = new ServerSocket(20006)) {

            System.out.println("等待客户端连接...");
            while (_accept) {
                Socket _socket = server.accept();
                fixedThreadPool.execute(() -> {
                    try (Socket socket = _socket;
                            OutputStream output = socket.getOutputStream();
                            BufferedReader input = new BufferedReader(new InputStreamReader(
                                    socket.getInputStream(), StandardCharsets.UTF_8))) {
                        boolean _write = true;
                        log.error("收到客户端连接: {}", socket.getPort());
                        socket.setSoTimeout(1000 * 30);

                        // 发送消息
                        String msg = "你好";
                        output.write(
                                (msg + IOUtils.LINE_SEPARATOR).getBytes(StandardCharsets.UTF_8));
                        log.error("f:{}, t:{}, {}", "服务端", socket.getPort(), msg);

                        while (_write) {
                            msg = input.readLine();
                            if (msg != null) {// 收到消息
                                log.error("f:{}, t:{}, {}", socket.getPort(), "服务端", msg);

                                if (_write) {
                                    // 发送消息
                                    msg = RandomStringUtils.randomAlphabetic(3);
                                    output.write((msg + IOUtils.LINE_SEPARATOR)
                                            .getBytes(StandardCharsets.UTF_8));
                                    log.error("f:{}, t:{}, {}", "服务端", socket.getPort(), msg);
                                }
                            } else {// reached
                                _write = false;
                            }

                            // do other thing after read-block
                        }
                    } catch (IOException e) {
                        throw new RuntimeException(e);
                    }
                });
            }
        } catch (IOException e) {
            throw new RuntimeException(e);
        } finally {
            fixedThreadPool.shutdown();
        }
        log.error("done! or wait threads...");
    }

}
@Slf4j
public class SocketClient {

    /** 例子: 发一条收一条 */
    public static void main(String[] args) {
        boolean _write = true;
        try (Socket client = new Socket("127.0.0.1", 20006);

                OutputStream output = client.getOutputStream();
                BufferedReader input = new BufferedReader(
                        new InputStreamReader(client.getInputStream(), StandardCharsets.UTF_8))) {
            while (_write) {
                String msg = input.readLine();
                if (msg != null) {// 收到消息
                    log.error("f:{}, t:{}, {}", "服务端", client.getLocalPort(), msg);

                    if (_write) {
                        // 发送消息
                        msg = RandomStringUtils.randomAlphabetic(3);
                        output.write(
                                (msg + IOUtils.LINE_SEPARATOR).getBytes(StandardCharsets.UTF_8));
                        log.error("f:{}, t:{}, {}", client.getLocalPort(), "服务端", msg);
                    }
                } else {// reached
                    _write = false;
                }

                // do other thing after read-block
            }
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

}

Channel: 非阻塞(可阻塞)

@Slf4j
public class ChannelServer {

    /** 例子: 发一条收一条 */
    public static void main(String[] args) {
        boolean _accept = true;
        ExecutorService fixedThreadPool = Executors.newFixedThreadPool(2);
        try (ServerSocketChannel ssc = ServerSocketChannel.open()) {
            ssc.bind(new InetSocketAddress(20006));
            // ssc.configureBlocking(false);

            System.out.println("等待客户端连接...");
            while (_accept) {
                SocketChannel _channel = ssc.accept();
                if (_channel != null) {// configureBlocking
                    fixedThreadPool.execute(() -> {
                        try (SocketChannel channel = _channel) {
                            boolean _write = true;
                            log.error("收到客户端连接: {}", channel.getRemoteAddress());
                            channel.configureBlocking(false);
                            // channel.socket().setSoTimeout(1000 * 30);

                            // 发送消息
                            String msg = "你好";
                            channel.write(ByteBuffer.wrap(msg.getBytes(StandardCharsets.UTF_8)));
                            log.error("f:{}, t:{}, {}", "服务端", channel.getRemoteAddress(), msg);

                            while (_write) {
                                for (int read = 0; read == 0;) {
                                    ByteBuffer bb = ByteBuffer.allocate(1024);
                                    if ((read = channel.read(bb)) > 0) {// 收到消息
                                        msg = new String(bb.array(), 0, read,
                                                StandardCharsets.UTF_8);
                                        log.error("f:{}, t:{}, {}", channel.getRemoteAddress(),
                                                "服务端", msg);

                                        if (_write) {
                                            // 发送消息
                                            msg = RandomStringUtils.randomAlphabetic(3);
                                            channel.write(ByteBuffer
                                                    .wrap(msg.getBytes(StandardCharsets.UTF_8)));
                                            log.error("f:{}, t:{}, {}", "服务端",
                                                    channel.getRemoteAddress(), msg);
                                        }
                                    } else if (read < 0) {// reached
                                        _write = false;
                                    } else if (_write) {
                                        // write as you wish
                                    }
                                }

                                // do other thing after read-for
                            }
                        } catch (IOException e) {
                            throw new RuntimeException(e);
                        }
                    });
                }
            }
        } catch (IOException e) {
            throw new RuntimeException(e);
        } finally {
            fixedThreadPool.shutdown();
        }
        log.error("done! or wait threads...");
    }

}
@Slf4j
public class ChannelClient {

    /** 例子: 发一条收一条 */
    public static void main(String[] args) {
        boolean _write = true;
        try (SocketChannel client = SocketChannel.open(new InetSocketAddress(20006))) {
            client.configureBlocking(false);

            while (_write) {
                for (int read = 0; read == 0;) {
                    ByteBuffer bb = ByteBuffer.allocate(1024);
                    if ((read = client.read(bb)) > 0) {// 收到消息
                        String msg = new String(bb.array(), 0, read, StandardCharsets.UTF_8);
                        log.error("f:{}, t:{}, {}", "服务端", client.getLocalAddress(), msg);

                        if (_write) {
                            // 发送消息
                            msg = RandomStringUtils.randomAlphabetic(3);
                            client.write(ByteBuffer.wrap(msg.getBytes(StandardCharsets.UTF_8)));
                            log.error("f:{}, t:{}, {}", client.getLocalAddress(), "服务端", msg);
                        }
                    } else if (read < 0) {// reached
                        _write = false;
                    } else if (_write) {
                        // write as you wish
                    }
                }

                // do other thing after read-for
            }
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

}

Selector: 多路复用

@Slf4j
public class SelectorServer {

    /** 例子: 发一条收一条 */
    public static void main(String[] args) {
        boolean _select = true;
        ExecutorService fixedThreadPool = Executors.newFixedThreadPool(2);
        try (ServerSocketChannel ssc = ServerSocketChannel.open();
                Selector selector = Selector.open()) {
            ssc.bind(new InetSocketAddress(20006));
            ssc.configureBlocking(false);
            ssc.register(selector, SelectionKey.OP_ACCEPT);

            System.out.println("等待客户端连接...");
            while (_select) {
                selector.select();
                Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
                while (iterator.hasNext()) {
                    SelectionKey sk = iterator.next();
                    iterator.remove();

                    fixedThreadPool.execute(() -> {
                        try {
                            if (sk.isAcceptable()) {
                                ServerSocketChannel _ssc = (ServerSocketChannel) sk.channel();
                                SocketChannel channel = _ssc.accept();

                                log.error("收到客户端连接: {}", channel.getRemoteAddress());
                                channel.configureBlocking(false);
                                channel.register(selector,
                                        SelectionKey.OP_READ | SelectionKey.OP_WRITE);

                                // 发送消息
                                String msg = "你好";
                                channel.write(ByteBuffer.wrap(msg.getBytes(StandardCharsets.UTF_8)));
                                log.error("f:{}, t:{}, {}", "服务端", channel.getRemoteAddress(), msg);
                            } else {
                                boolean _write = true;
                                SocketChannel channel = (SocketChannel) sk.channel();
                                if (sk.isReadable()) {
                                    ByteBuffer bb = ByteBuffer.allocate(1024);
                                    int read = channel.read(bb);
                                    if (read > 0) {// 收到消息
                                        String msg = new String(bb.array(), 0, read,
                                                StandardCharsets.UTF_8);
                                        log.error("f:{}, t:{}, {}", channel.getRemoteAddress(),
                                                "服务端", msg);

                                        if (_write) {
                                            // 发送消息
                                            msg = RandomStringUtils.randomAlphabetic(3);
                                            channel.write(ByteBuffer
                                                    .wrap(msg.getBytes(StandardCharsets.UTF_8)));
                                            log.error("f:{}, t:{}, {}", "服务端",
                                                    channel.getRemoteAddress(), msg);
                                        }
                                        return;
                                    } else {// reached
                                        _write = false;
                                    }
                                }
                                if (_write) {
                                    if (sk.isWritable()) {
                                        // write as you wish
                                    }
                                } else {
                                    sk.cancel();
                                    channel.close();
                                }
                            }
                        } catch (IOException e) {
                            throw new RuntimeException(e);
                        }
                    });
                }

                // do other thing after select-block
            }
        } catch (IOException e) {
            throw new RuntimeException(e);
        } finally {
            fixedThreadPool.shutdown();
        }
        log.error("done! or wait threads...");
    }

}
@Slf4j
public class SelectorClient {

    /** 例子: 发一条收一条 */
    public static void main(String[] args) {
        boolean _write = true;// only one channel
        try (SocketChannel _client = SocketChannel.open(new InetSocketAddress(20006));
                Selector selector = Selector.open()) {
            _client.configureBlocking(false);
            _client.register(selector, SelectionKey.OP_READ | SelectionKey.OP_WRITE);

            while (_write) {
                selector.select();
                Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
                while (iterator.hasNext()) {
                    SelectionKey sk = iterator.next();
                    iterator.remove();

                    SocketChannel client = (SocketChannel) sk.channel();
                    if (sk.isReadable()) {
                        ByteBuffer bb = ByteBuffer.allocate(1024);
                        int read = client.read(bb);
                        if (read > 0) {// 收到消息
                            String msg = new String(bb.array(), 0, read, StandardCharsets.UTF_8);
                            log.error("f:{}, t:{}, {}", "服务端", client.getLocalAddress(), msg);

                            if (_write) {
                                // 发送消息
                                msg = RandomStringUtils.randomAlphabetic(3);
                                client.write(ByteBuffer.wrap(msg.getBytes(StandardCharsets.UTF_8)));
                                log.error("f:{}, t:{}, {}", client.getLocalAddress(), "服务端", msg);
                            }
                            continue;
                        } else {// reached
                            _write = false;
                        }
                    }
                    if (_write) {
                        if (sk.isWritable()) {
                            // write as you wish
                        }
                    } else {
                        sk.cancel();
                        client.close();
                    }
                }

                // do other thing after select-block
            }
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

}

Signal driven: 母鸡

Asynchronous

@Slf4j
public class AsynchronousServer {

    public static void main(String[] args) {
        boolean _accept = true;
        // use default group with defaultThreadPool
        try (AsynchronousServerSocketChannel assc = AsynchronousServerSocketChannel.open()) {
            assc.bind(new InetSocketAddress(20006));

            System.out.println("等待客户端连接...");
            assc.accept(Boolean.TRUE, new CompletionHandler<AsynchronousSocketChannel, Boolean>() {
                @Override
                public void completed(AsynchronousSocketChannel channel, Boolean _write) {
                    try {
                        log.error("收到客户端连接: {}", channel.getRemoteAddress());

                        // 发送消息
                        String msg = "你好";
                        channel.write(ByteBuffer.wrap(msg.getBytes(StandardCharsets.UTF_8)));
                        log.error("f:{}, t:{}, {}", "服务端", channel.getRemoteAddress(), msg);

                        if (_write) {
                            ByteBuffer bb = ByteBuffer.allocate(1024);
                            channel.read(bb, _write, new CompletionHandler<Integer, Boolean>() {
                                @Override
                                public void completed(Integer read, Boolean _write) {
                                    try {
                                        if (read > 0) {// 收到消息
                                            String msg = new String(bb.array(), 0, read,
                                                    StandardCharsets.UTF_8);
                                            log.error("f:{}, t:{}, {}", channel.getRemoteAddress(),
                                                    "服务端", msg);

                                            if (_write) {
                                                // 发送消息
                                                msg = RandomStringUtils.randomAlphabetic(3);
                                                channel.write(ByteBuffer
                                                        .wrap(msg.getBytes(StandardCharsets.UTF_8)));
                                                log.error("f:{}, t:{}, {}", "服务端",
                                                        channel.getRemoteAddress(), msg);
                                            }
                                        } else {// reached
                                            _write = Boolean.FALSE;
                                        }
                                    } catch (IOException e) {
                                        throw new RuntimeException(e);
                                    } finally {
                                        if (_write) {
                                            bb.clear();
                                            channel.read(bb, _write, this);
                                        }
                                    }
                                }

                                @Override
                                public void failed(Throwable exc, Boolean _write) {
                                    exc.printStackTrace();
                                }
                            });
                        }

                        // write CompletionHandler && channel.close
                    } catch (IOException e) {
                        throw new RuntimeException(e);
                    } finally {
                        if (_accept) {
                            assc.accept(Boolean.TRUE, this);
                        }
                    }
                }

                @Override
                public void failed(Throwable exc, Boolean _write) {
                    exc.printStackTrace();
                }
            });

            // do other thing
            System.in.read();
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

}
@Slf4j
public class AsynchronousClient {

    public static void main(String[] args) {
        // use default group with defaultThreadPool
        try (AsynchronousSocketChannel client = AsynchronousSocketChannel.open()) {
            client.connect(new InetSocketAddress(20006), Boolean.TRUE,

                    new CompletionHandler<Void, Boolean>() {
                        @Override
                        public void completed(Void result, Boolean _write) {
                            if (_write) {
                                ByteBuffer bb = ByteBuffer.allocate(1024);
                                client.read(bb, _write, new CompletionHandler<Integer, Boolean>() {
                                    @Override
                                    public void completed(Integer read, Boolean _write) {
                                        try {
                                            if (read > 0) {// 收到消息
                                                String msg = new String(bb.array(), 0, read,
                                                        StandardCharsets.UTF_8);
                                                log.error("f:{}, t:{}, {}", "服务端",
                                                        client.getLocalAddress(), msg);

                                                if (_write) {
                                                    // 发送消息
                                                    msg = RandomStringUtils.randomAlphabetic(3);
                                                    client.write(ByteBuffer.wrap(
                                                            msg.getBytes(StandardCharsets.UTF_8)));
                                                    log.error("f:{}, t:{}, {}",
                                                            client.getLocalAddress(), "服务端", msg);
                                                }
                                            } else {// reached
                                                _write = Boolean.FALSE;
                                            }
                                        } catch (IOException e) {
                                            throw new RuntimeException(e);
                                        } finally {
                                            if (_write) {
                                                bb.clear();
                                                client.read(bb, _write, this);
                                            }
                                        }
                                    }

                                    @Override
                                    public void failed(Throwable exc, Boolean _write) {
                                        exc.printStackTrace();
                                    }
                                });
                            }

                            // write CompletionHandler && channel.close
                        }

                        @Override
                        public void failed(Throwable exc, Boolean _write) {
                            exc.printStackTrace();
                        }
                    });

            // do other thing
            System.in.read();
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment