Skip to content

Instantly share code, notes, and snippets.

@bingli224
Last active October 21, 2018 08:21
Show Gist options
  • Save bingli224/94035a133c178114b8f77423a742ddf3 to your computer and use it in GitHub Desktop.
Save bingli224/94035a133c178114b8f77423a742ddf3 to your computer and use it in GitHub Desktop.
Socket+Selector+Lambda in Kotlin
/**
* Socket+Selector+Lambda in Kotlin
*
* Further issues:
* When the receiver takes action fast enough before data is fully delivered through SocketChannel,
* especially the original one is likely to be bigger than buffer.
*
* Also, what if the register() is repeated again?
*
* @author BingLi224
* @version 2018.10.21
*/
import java.nio.ByteBuffer
import java.nio.channels.SocketChannel
import java.nio.channels.ServerSocketChannel
import java.nio.channels.Selector
import java.nio.channels.SelectionKey
import java.net.InetSocketAddress
import java.util.stream.StreamSupport
/**
* Listener connects to Speakers.
*
* @constructor Creates the Listener with a thread that is ready for reading new data from connected Speakers.
*/
class Listener {
// initialize selector
val selector = Selector.open ( )
init {
// create the thread for listening
Thread {
while ( true ) {
selector.select ( ) // check for the new incoming data from channels
val selectedKeys = selector.selectedKeys ( )
selectedKeys.parallelStream ( )
.filter { it.isReadable ( ) && // expect to read only
it.channel ( ) is SocketChannel } // expect the SocketChannel, casted from SelectableChannel
.map { it.channel ( ) as SocketChannel } // cast as SocketChannel
.forEach {
val input = ByteBuffer.allocate ( 1024 )
read@ while ( true ) {
// read the data, got the new size of data
val len = it.read ( input )
when {
len > 0 -> {
// buffer for extract data
val data = ByteArray ( len )
input.flip ( ) // back to the beginning of buffer
input.get ( data ) // convert the data to byte array
println ( "[$this] got: ${String ( data )}" )
input.flip ( ) // reset the buffer to put() next time
}
len == 0 -> break@read // len = 0
// end of data, so break out from reading loop
else -> {
// disconnected,
// so unregister from selector
it.keyFor ( selector )
.cancel ( )
break@read // break out from reading loop
}
}
}
}
// clean up the list of selected keys to check the next time
// if not reset the list, the channel without new data is still in the next list
selectedKeys.clear ( )
}
}.start ( )
}
/**
* Connects to Speaker with given socket port.
*
* @param id Socket port of the Speaker.
*/
fun addSpeaker ( id : Int ) {
// create the socket channel to Speaker channel
var socketChannel = SocketChannel.open (
InetSocketAddress ( "localhost", id )
)
socketChannel.configureBlocking ( false )
selector.wakeup ( ) // interrupt the selector.select()
// register the socket channel to the selector
socketChannel.register ( selector, SelectionKey.OP_READ )
println ( "Listener [$this] connected to [$id]" )
}
}
/**
* Speaker can send "words" to the connected Listeners.
*
* @property id The socket port for listening.
* @property words The data for sending through SocketChannel to the Listener.
* @constructor Creates a Speaker with threads that run behine to accept the new connections, and keep sending data to connected Listeners
*/
class Speaker ( id : Int, words : String ) {
val serverSocketChannel : ServerSocketChannel
val selector : Selector = Selector.open ( )
init {
serverSocketChannel = ServerSocketChannel.open ( )
serverSocketChannel.bind (
InetSocketAddress ( "localhost", id )
)
println ( "Created Speaker: ${id}" )
// create the thread for listening
Thread {
while ( true ) {
val listener = serverSocketChannel.accept ( )
selector.wakeup ( )
listener.configureBlocking ( false )
listener.register ( selector, SelectionKey.OP_WRITE ) // remember the listener
}
}.start ( )
Thread {
while ( true ) {
Thread.sleep ( 200L + ( 0..200 ).shuffled ( ).first ( ) )
// send the words to the listeners
selector.selectNow ( )
val selectedKeys = selector.selectedKeys ( )
selectedKeys.parallelStream ( )
.filter { it.isWritable ( ) && // expect to get new listener
it.channel ( ) is SocketChannel } // expect the SocketChannel, casted from SelectableChannel
.map { it.channel ( ) as SocketChannel } // cast as SocketChannel
.forEach {
val output = ByteBuffer.allocate ( words.length )
output.put ( words.toByteArray ( ) )
output.flip ( ) // reset the buffer position to forward data
println ( "[$id] say: $words" )
it.write ( output )
}
selectedKeys.clear ( )
}
}.start ( )
}
}
fun main ( argv : Array <String> ){
Speaker ( 44444, "Hi" )
Speaker ( 44445, ":)" )
Speaker ( 44446, "hmm" )
val listener1 = Listener ( )
listener1.addSpeaker ( 44444 )
listener1.addSpeaker ( 44445 )
listener1.addSpeaker ( 44446 )
val listener2 = Listener ( )
listener2.addSpeaker ( 44444 )
listener2.addSpeaker ( 44445 )
listener2.addSpeaker ( 44446 )
val listener3 = Listener ( )
listener3.addSpeaker ( 44444 )
listener3.addSpeaker ( 44445 )
listener3.addSpeaker ( 44446 )
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment