Skip to content

Instantly share code, notes, and snippets.

@searler
Created July 19, 2015 21:45
Show Gist options
  • Save searler/2c2f4c90b425ab6170fd to your computer and use it in GitHub Desktop.
Save searler/2c2f4c90b425ab6170fd to your computer and use it in GitHub Desktop.
Serialized TCP Server using Akka IO
package io
import java.net.InetSocketAddress
import akka.actor.Actor
import akka.actor.ActorRef
import akka.actor.ActorSystem
import akka.actor.FSM
import akka.actor.Props
import akka.io.IO
import akka.io.Tcp
import akka.io.Tcp.Abort
import akka.io.Tcp.Bind
import akka.io.Tcp.Bound
import akka.io.Tcp.CommandFailed
import akka.io.Tcp.Connected
import akka.io.Tcp.PeerClosed
import akka.io.Tcp.Received
import akka.io.Tcp.Register
import akka.io.Tcp.Write
import akka.util.ByteString
/**
* TCP server over a serialized resource that permits interaction with at most one client at a time
*/
object ServerIO extends App {
implicit val system = ActorSystem("Sys")
val manager = system.actorOf(Props(new ResourceManager(system.actorOf(Props[Resource]))),
name = "manager")
val server: ActorRef = system.actorOf(Props[Server],
name = "server")
class Server extends Actor {
IO(Tcp) ! Bind(self, new InetSocketAddress("127.0.0.1", 9999))
def receive = {
case b @ Bound(localAddress) =>
case CommandFailed(_: Bind) =>
context stop self
case c @ Connected(remote, local) =>
val connection = sender()
context.actorOf(Props(new ConnectionHandler(manager, connection)),
name = "handler" + remote.getPort)
}
}
class ConnectionHandler(manager: ActorRef, connection: ActorRef) extends Actor {
manager ! Arrived(self)
connection ! Register(self)
def receive = {
case Received(data) => manager ! data
case PeerClosed =>
manager ! Gone(self)
context stop self
case Die => connection ! Abort
case data: ByteString => connection ! Write(data)
}
}
case object Die
case class Arrived(client: ActorRef)
case class Gone(client: ActorRef)
sealed trait State
case object Idle extends State
case object Running extends State
class ResourceManager(resource: ActorRef) extends FSM[State, Option[ActorRef]] {
startWith(Idle, None)
when(Idle) {
case Event(Arrived(newClient), None) =>
goto(Running) using Some(newClient)
}
when(Running) {
case Event(Arrived(newClient), Some(oldClient)) =>
oldClient ! Die
stay using Some(newClient)
case Event(data: ByteString, Some(client)) =>
resource tell (data, client)
stay
case Event(Gone(oldClient), Some(client)) if (oldClient == client) =>
goto(Idle) using None
case Event(Gone(oldClient), Some(client)) =>
stay
}
}
class Resource extends Actor {
def receive = {
case bs: ByteString => sender() ! bs
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment