Skip to content

Instantly share code, notes, and snippets.

@nshiba
Created December 5, 2017 06:47
Show Gist options
  • Save nshiba/9f2c7b37f018341a5ee1163060fff819 to your computer and use it in GitHub Desktop.
Save nshiba/9f2c7b37f018341a5ee1163060fff819 to your computer and use it in GitHub Desktop.
2017 ネトプロ 応用課題 参考実装
package netprog12.networking.httpd;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.SocketChannel;
public class ChannelEchoClient2 {
public static final int ECHO_PORT = 10007;
public static void main(String[] args) {
ChannelEchoClient2 client = new ChannelEchoClient2();
client.connect();
}
private void connect() {
try {
SocketChannel channel = SocketChannel.open(new InetSocketAddress("localhost", ECHO_PORT));
System.out.println(channel.socket().getRemoteSocketAddress() + ": 接続しました");
System.out.println("文字を入力しEnterで送信します");
new Thread(new ClientSenderTask(channel)).start();
new Thread(new ClientReaderTask(channel)).start();
} catch (IOException e) {
e.printStackTrace();
}
}
}
package netprog12.networking.httpd;
import com.sun.istack.internal.Nullable;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.CharBuffer;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.util.HashSet;
import java.util.Set;
/*
従来の java.net パッケージを使用した入出力では、ソケットの accept メソッドや read メソッドなどを呼び出すと接続や入力があるまで処理が待ち状態になりました。
このような入出力待ちの動作のことをブロックといいます。
接続の待ちうけでブロックが発生するため、複数のネットワーク接続を同時に処理するサーバアプリケーションを実装するにはマルチスレッドを利用する必要がありました。
しかしスレッドの生成はそれなりにコストのかかる処理であり、アクセスの多いサーバではその影響が無視できないくらい大きくなります。
そこで NIO ではブロックの発生しない入出力を実現する方法が提供されました。
ブロックされない入出力を利用すると、1つのスレッドでも複数の入出力を見かけ上同時に処理することができるようになります。
NIO でネットワーク入出力を行うためには SocketChannel や ServerSocketChannel を利用します。
これらはそれぞれ、java.io パッケージの Socket や ServerSocket に相当します。
これらのクラスはブロックする入出力とブロックしない入出力のどちらも利用することができます。
*/
public class ChannelEchoServer2 implements ServerInteraction {
private static final int ECHO_PORT = 10007;
private Set<SocketChannel> channels = new HashSet<>();
private Charset charset = Charset.forName("UTF-8");
public static void main(String[] args) {
new ChannelEchoServer2().run();
}
public void run() {
ServerSocketChannel serverChannel = null;
try {
serverChannel = ServerSocketChannel.open();
serverChannel.socket().bind(new InetSocketAddress(ECHO_PORT));
System.out.println("ChannelEchoServerが起動しました(port=" + serverChannel.socket().getLocalPort() + ")");
loop(serverChannel);
} catch (IOException e) {
e.printStackTrace();
} finally {
closeServer(serverChannel);
}
}
private void loop(ServerSocketChannel serverChannel) throws IOException {
while (true) {
showMessage("waiting accept()");
SocketChannel channel = serverChannel.accept();
SocketAddress address = channel.socket().getRemoteSocketAddress();
showMessage(address + ": 接続されました");
register(channel);
ServerReaderTask reader = new ServerReaderTask(channel,this);
new Thread(reader).start();
}
}
private void closeServer(ServerSocketChannel serverChannel) {
if (serverChannel == null || !serverChannel.isOpen()) {
return;
}
try {
System.out.println();
showMessage("ChannelEchoServerを停止します。");
serverChannel.close();
} catch (IOException e) {
e.printStackTrace();
}
}
private void showMessage(String message) {
System.out.println(message);
System.out.println("-----------------");
}
private void register(SocketChannel channel) {
channels.add(channel);
}
private void unregister(@Nullable SocketChannel channel) {
if (channel == null) {
return;
}
showMessage(channel.socket().getRemoteSocketAddress() + " is closed");
if (channel.isOpen()) {
try {
channel.close();
} catch (IOException e) {
e.printStackTrace();
}
}
channels.remove(channel);
}
private void send(String input) {
StringBuilder messageBuilder = new StringBuilder();
messageBuilder.append("receive message: ").append(input).append("\n");
messageBuilder.append("send list: \n");
for (SocketChannel channel : channels) {
if (!channel.isConnected()) {
unregister(channel);
continue;
}
ServerSenderTask sender = new ServerSenderTask(channel, charset.encode(CharBuffer.wrap(input)), this);
new Thread(sender).start();
messageBuilder.append(channel.socket().getRemoteSocketAddress()).append("\n");
}
showMessage(messageBuilder.toString());
}
@Override
public void brokenPipeChannel(SocketChannel channel) {
unregister(channel);
}
@Override
public void replyAll(String message) {
send(message);
}
}
package netprog12.networking.httpd;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
public class ClientReaderTask implements Runnable {
private static final int BUF_SIZE = 1000;
private Charset charset = Charset.forName("UTF-8");
private ByteBuffer buf = ByteBuffer.allocate(BUF_SIZE);
private SocketChannel channel;
public ClientReaderTask(SocketChannel channel) {
this.channel = channel;
}
@Override
public void run() {
try {
while (channel.isConnected()) {
buf.clear();
if (channel.read(buf) < 0) {
continue;
}
buf.flip();
System.out.println("受信:" + charset.decode(buf).toString());
}
} catch (IOException e) {
e.printStackTrace();
return;
}
}
}
package netprog12.networking.httpd;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.nio.CharBuffer;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
public class ClientSenderTask implements Runnable {
private Charset charset = Charset.forName("UTF-8");
private SocketChannel channel;
private BufferedReader keyin;
public ClientSenderTask(SocketChannel channel) {
this.channel = channel;
}
@Override
public void run() {
try {
while (channel.isConnected()) {
keyin = new BufferedReader(new InputStreamReader(System.in));
String line = keyin.readLine();
System.out.println("送信:" + line);
channel.write(charset.encode(CharBuffer.wrap(line + "\n")));
}
} catch (IOException e) {
e.printStackTrace();
return;
}
}
}
package netprog12.networking.httpd;
import java.nio.channels.SocketChannel;
public interface ServerInteraction {
public void brokenPipeChannel(SocketChannel channel);
public void replyAll(String message);
}
package netprog12.networking.httpd;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
public class ServerReaderTask implements Runnable {
private static final int BUF_SIZE = 1000;
private SocketChannel channel = null;
private ServerInteraction interaction;
public ServerReaderTask(SocketChannel channel, ServerInteraction interaction) {
this.channel = channel;
this.interaction = interaction;
}
public void run() {
while (true) {
String remoteAddress = channel.socket()
.getRemoteSocketAddress()
.toString();
try {
String input = readMessage();
if (input.isEmpty()) {
continue;
}
String message = remoteAddress + ": " + input;
interaction.replyAll(message);
} catch (IOException e) {
e.printStackTrace();
interaction.brokenPipeChannel(channel);
return;
}
}
}
private String readMessage() throws IOException {
ByteBuffer buf = ByteBuffer.allocate(BUF_SIZE);
Charset charset = Charset.forName("UTF-8");
if (channel.read(buf) < 0) {
return "";
}
//flip()は、新しい一連のチャネル読込み操作または相対「get」操作のためにバッファを準備します。リミットの値を現在位置の値に合わせたあと、位置の値をゼロにします。
buf.flip();
return charset.decode(buf).toString();
}
}
package netprog12.networking.httpd;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
public class ServerSenderTask implements Runnable {
private SocketChannel channel;
private ByteBuffer buf;
private ServerInteraction interaction;
public ServerSenderTask(SocketChannel channel, ByteBuffer buf, ServerInteraction interaction) {
this.channel = channel;
this.buf = buf;
this.interaction = interaction;
}
@Override
public void run() {
try {
channel.write(buf);
} catch (IOException e) {
e.printStackTrace();
interaction.brokenPipeChannel(channel);
}
}
}
@nshiba
Copy link
Author

nshiba commented Dec 5, 2017

講義でも言いましたが、切断する/した/された時の処理をしっかり実装していないので気をつけてください

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment