Skip to content

Instantly share code, notes, and snippets.

@bartosz25
Created November 30, 2019 14:15
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save bartosz25/7327f36f256556f54a43e5e67317f6a4 to your computer and use it in GitHub Desktop.
Save bartosz25/7327f36f256556f54a43e5e67317f6a4 to your computer and use it in GitHub Desktop.
selector-nio-example
package com.waitingforcode
import java.nio.ByteBuffer
import java.nio.channels.SocketChannel
import java.nio.charset.StandardCharsets
object ByteBufferIOOps {
def writeToChannel(channel: SocketChannel, textToWrite: String, buffer: ByteBuffer) = {
buffer.clear()
buffer.put(textToWrite.getBytes())
buffer.flip()
while (buffer.hasRemaining) {
val bytesWritten = channel.write(buffer)
assert(bytesWritten > 0)
}
buffer.clear()
}
def read(channel: SocketChannel, buffer: ByteBuffer): String = {
buffer.clear()
val readBytes = channel.read(buffer)
assert(readBytes > -1, "Channel has been closed") // fail-fast just for easier testing
buffer.flip()
val text = new String(buffer.array(), 0, readBytes, StandardCharsets.UTF_8)
buffer.clear()
text
}
}
class SocketClient(serverSocketAddress: InetSocketAddress) extends Runnable {
override def run(): Unit = {
val clientSelector = Selector.open()
// sending can be also implemented with the help of a
// queue ==> https://stackoverflow.com/questions/14249353/nio-blocking-write-not-working
(0 to 2).foreach(id => {
val channelSend = SocketChannel.open()
channelSend.configureBlocking(false)
val connectKey = channelSend.register(clientSelector, SelectionKey.OP_CONNECT)
channelSend.connect(serverSocketAddress)
connectKey.attach(s"client#${id}")
})
val buffer = ByteBuffer.allocate(256)
while (true) {
clientSelector.selectNow()
val selectedKeys = clientSelector.selectedKeys().iterator()
while (selectedKeys.hasNext) {
val key = selectedKeys.next()
if (key.isConnectable) {
val channel = key.channel().asInstanceOf[SocketChannel]
val isConnected = channel.finishConnect()
assert(isConnected, "Client should correctly handle connect")
val clientId = key.attachment().toString
println(s"${clientId} was correctly connected to the server")
// subscribe to reads - other way to do, if you want to keep already existent
// operations, is to call (key.interestOps(key.interestOps() | SelectionKey.OP_READ))
key.interestOps(SelectionKey.OP_READ)
} else if (key.isWritable) {
val channel = key.channel().asInstanceOf[SocketChannel]
val clientId = key.attachment().toString
val textToWrite = s"Hello from the ${clientId}"
logClient(s"${clientId} Writing ${textToWrite}")
ByteBufferIOOps.writeToChannel(channel, textToWrite, buffer)
key.interestOps(SelectionKey.OP_READ)
} else if (key.isReadable) {
val channel = key.channel().asInstanceOf[SocketChannel]
val inputText = ByteBufferIOOps.read(channel, buffer)
logClient(s"${key.attachment().toString} Got message from the server: '${inputText}'")
key.interestOps(SelectionKey.OP_WRITE)
}
selectedKeys.remove()
}
}
}
private def logClient(text: String)= println(s"Client: ${text}")
}
package com.waitingforcode
import java.net.InetSocketAddress
import java.nio.ByteBuffer
import java.nio.channels.{SelectionKey, Selector, ServerSocketChannel, SocketChannel}
object SocketServer {
def main(args: Array[String]): Unit = {
val serverAddress = new InetSocketAddress("localhost", 4444)
val serverSelector = Selector.open()
val server = ServerSocketChannel.open()
server.bind(serverAddress)
server.configureBlocking(false)
server.register(serverSelector, SelectionKey.OP_ACCEPT)
new Thread(new SocketClient(serverAddress)).start()
// we can use a single buffer because we've a single thread here
val buffer = ByteBuffer.allocate(256)
def logServer(textToLog: String) = println(Console.BLUE + s"Server: ${textToLog}" + Console.RESET)
while (true) {
serverSelector.selectNow()
val selectedKeys = serverSelector.selectedKeys().iterator()
while (selectedKeys.hasNext) {
val key = selectedKeys.next
if (key.isAcceptable) {
val serverSocketChannel = key.channel().asInstanceOf[ServerSocketChannel]
val socketChannel = serverSocketChannel.accept()
logServer(s"accepted connection from ${socketChannel.getRemoteAddress}")
socketChannel.configureBlocking(false)
socketChannel.register(serverSelector, SelectionKey.OP_WRITE)
// If I don't write, the keys from the client are always key 1 vs 8 (so READ is not ready!)
val textToWrite = "Hello world"
logServer(s"Writing '${textToWrite}'")
ByteBufferIOOps.writeToChannel(socketChannel, textToWrite, buffer)
} else if (key.isReadable) {
val channel = key.channel().asInstanceOf[SocketChannel]
val inputText = ByteBufferIOOps.read(channel, buffer)
logServer(s"Reading '${inputText}'")
key.interestOps(SelectionKey.OP_WRITE)
} else if (key.isWritable) {
val socketChannel = key.channel().asInstanceOf[SocketChannel]
val textToWrite = "Hello from server"
logServer(s"Writing '${textToWrite}'")
ByteBufferIOOps.writeToChannel(socketChannel, textToWrite, buffer)
key.interestOps(SelectionKey.OP_READ)
}
selectedKeys.remove()
}
Thread.sleep(1000L)
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment