Skip to content

Instantly share code, notes, and snippets.

@quelgar
Last active August 22, 2017 00:39
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save quelgar/03e9f7da8a3075a69c9580791a309dd3 to your computer and use it in GitHub Desktop.
Save quelgar/03e9f7da8a3075a69c9580791a309dd3 to your computer and use it in GitHub Desktop.
Testing accepting TCP connections with Monix NIO.
package monix.rsocket.tcp
import java.nio.charset.StandardCharsets
import monix.eval.{Callback, Task}
import monix.reactive.Observable
import monix.nio.tcp._
import monix.execution.Scheduler.Implicits.global
/**
* Test TCP server that keeps accepting more client connections.
*
* Two problems:
*
* 1) Can't figure out how to shutdown the accept loop cleanly.
* Any attempt to close the `TaskServerSocketChannel` when it's suspended in the `accept` call results in a
* `java.nio.channels.AsynchronousCloseException`. The program never exits when main finishes.
* On my Mac, I end up with two running threads in `KQueue.keventPoll`.
*
* 2) Current code runs the actual work synchronously within the accept loop. Need to figure out the best way
* to run it asynchronously so we don't prevent accepting new connections.
*/
object MonixNioTest {
def main(args: Array[String]): Unit = {
val serverTask = asyncServer(java.net.InetAddress.getByName(null).getHostName, 9001)
def accept(server: TaskServerSocketChannel): Task[TaskServerSocketChannel] = {
for {
socket <- {
println("Accept")
server.accept()
}
clientChannel = {
println(s"Accepted socket: $socket")
readWriteAsync(socket)
}
reader <- clientChannel.tcpObservable
writer <- clientChannel.tcpConsumer
length <- reader.map {
array =>
('['.toByte +: array :+ ':'.toByte) ++ (array.reverse ++ "] ".getBytes(StandardCharsets.UTF_8))
}.doOnTerminateEval(_ => clientChannel.stopWriting()).consumeWith(writer)
_ <- accept(server)
} yield {
println(s"Wrote $length bytes")
server
}
}
def serverProgram = serverTask.flatMap {
server =>
accept(server).doOnCancel(Task.defer(server.close()))
}
def clientProgram = {
val client = readWriteAsync("localhost", 9001, 256 * 1024)
for {
writer <- client.tcpConsumer
reader <- client.tcpObservable
written <- Observable.range(1000L, 1005L).map(i => i.toString.getBytes(StandardCharsets.UTF_8)).consumeWith(writer)
_ <- client.stopWriting().doOnFinish(x => Task.now(println(s"Client stop writing done: $x")))
_ <- reader.doOnTerminateEval {
x =>
println(s"Reader terminated: $x")
client.close()
}.foreachL {
array =>
println(s"client got: ${new String(array, StandardCharsets.UTF_8)}")
}
_ <- client.close().doOnFinish(x => Task.now(println(s"Client close done: $x")))
} yield {
written
}
}
val serverFuture = serverProgram.runAsync(new Callback[TaskServerSocketChannel] {
override def onSuccess(value: TaskServerSocketChannel): Unit = {
println(s"Server program done")
}
override def onError(ex: Throwable): Unit = {
println("Server error")
ex.printStackTrace()
}
})
sys.addShutdownHook {
println("Shutdown hook running")
serverFuture.cancel()
}
clientProgram.zipMap(clientProgram)(_ + _).runAsync(new Callback[Long] {
override def onError(ex: Throwable): Unit = {
ex.printStackTrace()
}
override def onSuccess(value: Long): Unit = {
println(s"Client wrote $value bytes")
}
})
while (scala.io.StdIn.readLine("Q to quit:").toUpperCase() != "Q") {
}
serverFuture.cancel() // is there a better way?
println("Main done!")
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment