Created
November 30, 2019 14:15
-
-
Save bartosz25/7327f36f256556f54a43e5e67317f6a4 to your computer and use it in GitHub Desktop.
selector-nio-example
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.waitingforcode | |
import java.nio.ByteBuffer | |
import java.nio.channels.SocketChannel | |
import java.nio.charset.StandardCharsets | |
object ByteBufferIOOps { | |
def writeToChannel(channel: SocketChannel, textToWrite: String, buffer: ByteBuffer) = { | |
buffer.clear() | |
buffer.put(textToWrite.getBytes()) | |
buffer.flip() | |
while (buffer.hasRemaining) { | |
val bytesWritten = channel.write(buffer) | |
assert(bytesWritten > 0) | |
} | |
buffer.clear() | |
} | |
def read(channel: SocketChannel, buffer: ByteBuffer): String = { | |
buffer.clear() | |
val readBytes = channel.read(buffer) | |
assert(readBytes > -1, "Channel has been closed") // fail-fast just for easier testing | |
buffer.flip() | |
val text = new String(buffer.array(), 0, readBytes, StandardCharsets.UTF_8) | |
buffer.clear() | |
text | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class SocketClient(serverSocketAddress: InetSocketAddress) extends Runnable { | |
override def run(): Unit = { | |
val clientSelector = Selector.open() | |
// sending can be also implemented with the help of a | |
// queue ==> https://stackoverflow.com/questions/14249353/nio-blocking-write-not-working | |
(0 to 2).foreach(id => { | |
val channelSend = SocketChannel.open() | |
channelSend.configureBlocking(false) | |
val connectKey = channelSend.register(clientSelector, SelectionKey.OP_CONNECT) | |
channelSend.connect(serverSocketAddress) | |
connectKey.attach(s"client#${id}") | |
}) | |
val buffer = ByteBuffer.allocate(256) | |
while (true) { | |
clientSelector.selectNow() | |
val selectedKeys = clientSelector.selectedKeys().iterator() | |
while (selectedKeys.hasNext) { | |
val key = selectedKeys.next() | |
if (key.isConnectable) { | |
val channel = key.channel().asInstanceOf[SocketChannel] | |
val isConnected = channel.finishConnect() | |
assert(isConnected, "Client should correctly handle connect") | |
val clientId = key.attachment().toString | |
println(s"${clientId} was correctly connected to the server") | |
// subscribe to reads - other way to do, if you want to keep already existent | |
// operations, is to call (key.interestOps(key.interestOps() | SelectionKey.OP_READ)) | |
key.interestOps(SelectionKey.OP_READ) | |
} else if (key.isWritable) { | |
val channel = key.channel().asInstanceOf[SocketChannel] | |
val clientId = key.attachment().toString | |
val textToWrite = s"Hello from the ${clientId}" | |
logClient(s"${clientId} Writing ${textToWrite}") | |
ByteBufferIOOps.writeToChannel(channel, textToWrite, buffer) | |
key.interestOps(SelectionKey.OP_READ) | |
} else if (key.isReadable) { | |
val channel = key.channel().asInstanceOf[SocketChannel] | |
val inputText = ByteBufferIOOps.read(channel, buffer) | |
logClient(s"${key.attachment().toString} Got message from the server: '${inputText}'") | |
key.interestOps(SelectionKey.OP_WRITE) | |
} | |
selectedKeys.remove() | |
} | |
} | |
} | |
private def logClient(text: String)= println(s"Client: ${text}") | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.waitingforcode | |
import java.net.InetSocketAddress | |
import java.nio.ByteBuffer | |
import java.nio.channels.{SelectionKey, Selector, ServerSocketChannel, SocketChannel} | |
object SocketServer { | |
def main(args: Array[String]): Unit = { | |
val serverAddress = new InetSocketAddress("localhost", 4444) | |
val serverSelector = Selector.open() | |
val server = ServerSocketChannel.open() | |
server.bind(serverAddress) | |
server.configureBlocking(false) | |
server.register(serverSelector, SelectionKey.OP_ACCEPT) | |
new Thread(new SocketClient(serverAddress)).start() | |
// we can use a single buffer because we've a single thread here | |
val buffer = ByteBuffer.allocate(256) | |
def logServer(textToLog: String) = println(Console.BLUE + s"Server: ${textToLog}" + Console.RESET) | |
while (true) { | |
serverSelector.selectNow() | |
val selectedKeys = serverSelector.selectedKeys().iterator() | |
while (selectedKeys.hasNext) { | |
val key = selectedKeys.next | |
if (key.isAcceptable) { | |
val serverSocketChannel = key.channel().asInstanceOf[ServerSocketChannel] | |
val socketChannel = serverSocketChannel.accept() | |
logServer(s"accepted connection from ${socketChannel.getRemoteAddress}") | |
socketChannel.configureBlocking(false) | |
socketChannel.register(serverSelector, SelectionKey.OP_WRITE) | |
// If I don't write, the keys from the client are always key 1 vs 8 (so READ is not ready!) | |
val textToWrite = "Hello world" | |
logServer(s"Writing '${textToWrite}'") | |
ByteBufferIOOps.writeToChannel(socketChannel, textToWrite, buffer) | |
} else if (key.isReadable) { | |
val channel = key.channel().asInstanceOf[SocketChannel] | |
val inputText = ByteBufferIOOps.read(channel, buffer) | |
logServer(s"Reading '${inputText}'") | |
key.interestOps(SelectionKey.OP_WRITE) | |
} else if (key.isWritable) { | |
val socketChannel = key.channel().asInstanceOf[SocketChannel] | |
val textToWrite = "Hello from server" | |
logServer(s"Writing '${textToWrite}'") | |
ByteBufferIOOps.writeToChannel(socketChannel, textToWrite, buffer) | |
key.interestOps(SelectionKey.OP_READ) | |
} | |
selectedKeys.remove() | |
} | |
Thread.sleep(1000L) | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment