Skip to content

Instantly share code, notes, and snippets.

@zentrope
Created September 14, 2010 05:54
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save zentrope/578610 to your computer and use it in GitHub Desktop.
Save zentrope/578610 to your computer and use it in GitHub Desktop.
// A small chat server in order to work out the details of a
// socket server using Actors. And maybe just a little attempt
// to wrap TCP itself inside an Actor, kinda like Erlang does
// it. The whole idea is to try and make things seem simpler,
// ultimately, and to remove any shared state.
package zentrope.chat {
import java.net.Socket
import java.net.ServerSocket
import scala.actors.DaemonActor
import scala.actors.Actor.actor
object Util {
def addShutdownHook(body: => Unit) =
Runtime.getRuntime.addShutdownHook(new Thread {
override def run { body }
})
}
object Router extends DaemonActor {
// Routes messages received from one client to
// all the other participating clients and also
// manages the list of participating clients.
case class AddClient(client: Client)
case class RemoveClient(client: Client)
case class Broadcast(msg: String)
def add(client: Client) = {
println (" :: router adding " + client) ;
this ! AddClient(client)
}
def remove(client: Client) = {
println (" :: router removing " + client)
this ! RemoveClient(client)
}
def broadcast(msg: String) =
this ! Broadcast(msg)
def act = {
var clients: List[Client] = List.empty
loop { react {
case AddClient(client) =>
clients = client :: clients
case RemoveClient(client) =>
clients = clients filter { c => c != client }
case Broadcast(msg) =>
clients foreach { client =>
client.broadcast(msg)
}
}}
}
}
//=========================================================================
class Client(socket: Socket, id: Int) extends DaemonActor {
private case class TcpLine(line: String)
private case class TcpSend(line: String)
private case class TcpClose(reason: String)
private case class TcpRead()
import java.io._
// Implicit methods tell the compiler how to turn the parameter's type
// into the return type. So, when you assign an InputStream to a
// val that is typed BufferedReader, this method is called.
private implicit def inputStreamWrapper(in: InputStream): BufferedReader =
new BufferedReader(new InputStreamReader(in))
private implicit def outputStreamWrapper(out: OutputStream): PrintWriter =
new PrintWriter(new OutputStreamWriter(out))
override def toString(): String = {
format("<client:%d>", id)
}
def broadcast(msg: String) {
this ! TcpSend(msg)
}
private def handle(func: => Unit) = {
actor {
try {
func
}
catch {
case t: Throwable =>
this ! TcpClose(t.getMessage)
}
}
}
private def read(in: BufferedReader) = {
handle {
val line = in.readLine()
line match {
case null =>
throw new Exception("Client closed socket.")
case "quit" =>
throw new Exception("Client quits.")
case _ =>
this ! TcpLine(line)
}
}
}
private def write(out: PrintWriter, line: String) = {
handle {
val out: PrintWriter = socket.getOutputStream()
out.println(line)
out.flush()
}
}
override def start() = {
super.start()
this ! TcpRead
this
}
def act() {
val in: BufferedReader = socket.getInputStream()
val out: PrintWriter = socket.getOutputStream()
Router.add(this)
var keepGoing = true
loopWhile(keepGoing) { react {
case TcpRead =>
read(in)
case TcpSend(line) =>
write(out, line)
case TcpLine(line) =>
val msg = format("client.%d> %s", id, line) ;
println(msg) ;
Router.broadcast(msg) ;
this ! TcpRead
case TcpClose(reason) =>
println(format(" :: closing client [%d] due to [%s].", id, reason))
socket.close() ;
Router.remove(this) ;
keepGoing = false
}}
}
}
//=========================================================================
class ChatServer(val port: Int) {
// Accepts incoming connections and spawns an actor
// for each one.
var serverSocket: Option[ServerSocket] = None
def loopUntilSocketClosed(func: => Unit) = {
try {
while (true) {
func
}
}
catch {
case (t: Throwable) => {
println(format(" :: server shutting down due to: [%s]",
t.getMessage))
}
}
}
def start() = {
Router.start
serverSocket = Some(new ServerSocket(port))
var clientId: Int = 0
loopUntilSocketClosed {
println(" :: waiting for a connection (aren't we all)")
val clientSocket = serverSocket.get.accept()
println(" :: got a connection")
val client = new Client(clientSocket, clientId)
println(" :: starting client actor " + client)
client.start
clientId += 1
}
println(" :: no longer accepting connections")
}
def stop() {
println(" :: closing server socket")
serverSocket match {
case None =>
;
case Some(s) =>
s.close()
}
}
}
//=========================================================================
object Main {
def stopServer(server: Option[ChatServer]) = server match {
case None =>
;
case Some(s) =>
s.stop()
}
def main(args: Array[String]) {
println("Hello chat.");
println(" :: starting server")
val server = Some(new ChatServer(9999))
Util.addShutdownHook {
println("\n :: shutdown hook stopping server")
stopServer(server)
}
try {
server.get.start()
}
catch {
case t: Throwable =>
println(" :: " + t);
}
println(" :: done serving")
}
}
}
@zentrope
Copy link
Author

A nearly featureless chat server, with some protection, but not really. It can be overwhelmed by too many connections, for instance.

Anyway, the challenge was to acquaint myself with how to write what is essentially a threaded socket server using actors. Notice there's never a mention of a thread!

The "state" of the chat server is maintained in an actor called the Router. When a client gets a line over a socket, it sends it to the router, which in turn sends a message to each client.

Part of there reason this is so large is because I was sorta going down the route of Erlang's TCP modules which hide a lot of the read/write details from the users of the them. For instance, I could imagine generalizing the above a little to make it a Line Oriented TCP Client of some sort, and then just registering handlers with it.

Or something.

Another adjustment would be to get the ChatServer class itself to be an Actor rather than an infinite loop. But it's late. So I'm going to leave it at that.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment