Skip to content

Instantly share code, notes, and snippets.

@zentrope
Created September 15, 2010 05:19
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save zentrope/580270 to your computer and use it in GitHub Desktop.
Save zentrope/580270 to your computer and use it in GitHub Desktop.
// A small chat server in order to work out the details of a
// socket server using Actors. And maybe just a little attempt
// to wrap TCP itself inside an Actor, kinda like Erlang does
// it. The whole idea is to try and make things seem simpler,
// ultimately, and to remove any shared state.
package zentrope.chat {
import java.net.Socket
import java.net.ServerSocket
import scala.actors.Actor
import scala.actors.Actor.actor
import scala.actors.DaemonActor
import scala.actors.Exit
import scala.actors.TIMEOUT
object Util {
def addShutdownHook(body: => Unit) =
Runtime.getRuntime.addShutdownHook(new Thread {
override def run { body }
})
}
object Router extends DaemonActor {
// Routes messages received from one client to
// all the other participating clients and also
// manages the list of participating clients.
case class AddClient(client: Client)
case class RemoveClient(client: Client)
case class Broadcast(msg: String)
def add(client: Client) = {
println (" :: router adding " + client) ;
this ! AddClient(client)
}
def remove(client: Client) = {
println (" :: router removing " + client)
this ! RemoveClient(client)
}
def broadcast(msg: String) =
this ! Broadcast(msg)
def act = {
var clients: List[Client] = List.empty
loop { react {
case AddClient(client) =>
clients = client :: clients
case RemoveClient(client) =>
clients = clients filter { c => c != client }
case Broadcast(msg) =>
clients foreach { client =>
client.broadcast(msg)
}
}}
}
}
//=========================================================================
class Client(val socket: Socket, val id: Int) extends DaemonActor {
private case class TcpLine(line: String)
private case class TcpSend(line: String)
private case class TcpRead()
import java.io._
// Implicit methods tell the compiler how to turn the parameter's type
// into the return type. So, when you assign an InputStream to a
// val that is typed BufferedReader, this method is called.
private implicit def inputStreamWrapper(in: InputStream): BufferedReader =
new BufferedReader(new InputStreamReader(in))
private implicit def outputStreamWrapper(out: OutputStream): PrintWriter =
new PrintWriter(new OutputStreamWriter(out))
override def toString(): String = {
format("<client:%d>", id)
}
def broadcast(msg: String) {
this ! TcpSend(msg)
}
def close(reason: String) {
println(format(" :: closing client [%d] due to [%s].", id, reason))
socket.close() ;
Router.remove(this) ;
}
private def read(in: BufferedReader) = {
val line = in.readLine()
line match {
case null =>
throw new Exception("Client disconnected.")
case "quit" =>
throw new Exception("Client quit.")
case _ =>
this ! TcpLine(line)
}
}
private def write(out: PrintWriter, line: String) = {
val out: PrintWriter = socket.getOutputStream()
out.println(line)
out.flush()
}
override def start() = {
super.start()
this ! TcpRead
this
}
def act() {
val in: BufferedReader = socket.getInputStream()
val out: PrintWriter = socket.getOutputStream()
Router.add(this)
var keepGoing = true
loopWhile(keepGoing) { react {
case TcpRead =>
read(in)
case TcpSend(line) =>
write(out, line)
case TcpLine(line) =>
val msg = format("client.%d> %s", id, line) ;
println(msg) ;
Router.broadcast(msg) ;
this ! TcpRead
}}
}
}
//=========================================================================
class ChatServer(val port: Int) extends DaemonActor {
private case class TcpAccept()
private case class TcpClose()
private case class NewClient(client: Client)
private var serverSocket: Option[ServerSocket] = None
override def start() = {
super.start()
this ! TcpAccept
this
}
def stop() {
this ! TcpClose
}
def act() {
trapExit = true
Router.start
serverSocket = Some(new ServerSocket(port))
var clientId: Int = 0
loop { react {
case NewClient(client) =>
link(client) ;
this ! TcpAccept
case TcpAccept =>
clientId += 1 ;
val id = clientId ;
val self = this ;
actor {
// Run inside an actor in order to avoid blocking
// this actor. We want to receive Exits from clients.
val clientSocket = serverSocket.get.accept() ;
val client = new Client(clientSocket, id) ;
client.start ;
self ! NewClient(client)
}
case TcpClose =>
serverSocket match {
case None => ;
case Some(s) => s.close()
}
case Exit(client: Client, e: Exception) =>
client.close(e.getCause().getMessage().toString)
}}
}
}
//=========================================================================
object Lock {
val lock: AnyRef = new Object()
def acquire() = { lock.synchronized { lock.wait() }}
def release() = { lock.synchronized { lock.notifyAll() }}
}
//=========================================================================
object Main {
class Supervisor extends DaemonActor {
// Keeps the chat server running
case class Stop()
def stop() = this ! Stop
def startUp(): Supervisor = { start ; this }
def act() {
trapExit = true
var keepGoing = true
var server = new ChatServer(9999)
server.start
link(server)
loopWhile (keepGoing) { react {
case Stop() =>
server.stop ;
keepGoing = false
case Exit(from: Actor, e: Exception) =>
println(" !! chatserver exited, reason: " + e.toString) ;
server.stop
server = new ChatServer(9999)
link(server)
server.start
}}
}
}
def main(args: Array[String]) {
println("Hello chat.");
println(" :: starting server")
Util.addShutdownHook {
println("\n :: shutdown hook stopping server")
Lock.release()
}
var runner = new Supervisor().startUp
Lock.acquire()
runner.stop
println(" :: done serving")
}
}
}
@zentrope
Copy link
Author

Here's a new version of my little TCP chat server. This one uses actors a lot more with some Erlang-ish stuff sprinkled in. Rather than a client actor handling an exception (say when a client disconnects), it instead throws an exception. The ChatServer actor is linked to it, so it gets the exit message and can do the right thing, which in this case, is to close the socket.

I change the main loop to use a Supervisor actor which traps exits from the ChatServer and restarts it. Clients will have to reconnect, of course, but it at least attempts to keep the server running. I can imagine adding some checks to make sure that the starts haven't happend too often, and to put a time delay in there, etc, etc.

Code's a bit messy. I should move all the stuff that happens in the actor loop to functions so that the loops are very clean and readable. Nevertheless, I'm getting the idea of how these things work and how closely you can mimic a kind of Erlang interaction.

Neato.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment