Skip to content

Instantly share code, notes, and snippets.

@fanf
Last active June 19, 2019 10:34
Show Gist options
  • Save fanf/b4ed912ba97242e09825282068359c5d to your computer and use it in GitHub Desktop.
Save fanf/b4ed912ba97242e09825282068359c5d to your computer and use it in GitHub Desktop.
Example of an async socket handling multiple connection in ZIO + nio
import scalaz.nio._
import scalaz.nio.channels.{AsynchronousServerSocketChannel, AsynchronousSocketChannel}
import scalaz.zio.console._
import scalaz.zio._
object TestSocket extends App {
override def run(args: List[String]): ZIO[Environment, Nothing, Int] = {
theSocket.foldM(
err => putStrLn(s"Execution Failed with: $err") *> ZIO.succeed(1),
_ => ZIO.succeed(0)
)
}
val theSocket =
for {
address <- SocketAddress.inetSocketAddress("127.0.0.1", 1337)
socket <- AsynchronousServerSocketChannel()
_ <- socket.bind(address)
ref <- Ref.make[Int](0)
connections <- awaitConnection(ref, socket, doWork)
} yield ()
/*
* Accept a connection from the server, fork the worker on it, and
* loop to wait for next connection
*/
def awaitConnection(ref: Ref[Int], socket: AsynchronousServerSocketChannel, worker: Int => AsynchronousSocketChannel => ZIO[Console, Throwable, Unit]): ZIO[Console, Throwable, Nothing] = {
for {
i <- ref.update(_+1)
_ <- putStrLn("accept")
_ <- socket.accept.flatMap(s => doWork(i)(s).ensuring(s.close.orDie).fork)
// now loop for next connection
_ <- putStrLn("next")
loop <- awaitConnection(ref, socket, worker)
} yield loop
}
/*
* From a connected AsynchronousSocketChannel, read forever, until connection termination^W^W^W
* Actually only read one time 16 bytes and close the connection, the loop need to handle
* correctly client disconnect, see comment
*/
def doWork(i:Int)(work: AsynchronousSocketChannel): ZIO[Console, Throwable, Unit] = {
for {
_ <- putStrLn(s"read from socket con $i")
chunk <- work.read(16)
str = chunk.toArray.map(_.toChar).mkString
_ <- putStrLn(s"content for con $i: " + str)
// now loop
// loop <- doWork(i)(work)
} yield () //loop
}
}
/*
// Run the program
accept
// connect with telnet on console #1
read from socket con 1
next
accept
// connect with telnet on console #2
read from socket con 2
next
accept
// write something on console #2
content for con 2: foo
// console #2: connection closed
//write something on console #1
content for con 1: bar
// console #1: connection closed
*/
@fanf
Copy link
Author

fanf commented Jun 17, 2019

Well, we iterated several times to where we put that fork :) So it's because of the way AsynchronousServerSocketChannel.accept implementation is done: it takes a handler and wait for the handler to return to continue and wait for another connection. So we have to fork the handler (doWork) to make line 33 ends and awaitConnection loop until it comes back to accpet and wait for an other connection.

@vil1
Copy link

vil1 commented Jun 18, 2019

makes sense, thanks for the explanation!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment