Created
September 15, 2010 05:19
-
-
Save zentrope/580270 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// A small chat server in order to work out the details of a | |
// socket server using Actors. And maybe just a little attempt | |
// to wrap TCP itself inside an Actor, kinda like Erlang does | |
// it. The whole idea is to try and make things seem simpler, | |
// ultimately, and to remove any shared state. | |
package zentrope.chat { | |
import java.net.Socket | |
import java.net.ServerSocket | |
import scala.actors.Actor | |
import scala.actors.Actor.actor | |
import scala.actors.DaemonActor | |
import scala.actors.Exit | |
import scala.actors.TIMEOUT | |
object Util { | |
def addShutdownHook(body: => Unit) = | |
Runtime.getRuntime.addShutdownHook(new Thread { | |
override def run { body } | |
}) | |
} | |
object Router extends DaemonActor { | |
// Routes messages received from one client to | |
// all the other participating clients and also | |
// manages the list of participating clients. | |
case class AddClient(client: Client) | |
case class RemoveClient(client: Client) | |
case class Broadcast(msg: String) | |
def add(client: Client) = { | |
println (" :: router adding " + client) ; | |
this ! AddClient(client) | |
} | |
def remove(client: Client) = { | |
println (" :: router removing " + client) | |
this ! RemoveClient(client) | |
} | |
def broadcast(msg: String) = | |
this ! Broadcast(msg) | |
def act = { | |
var clients: List[Client] = List.empty | |
loop { react { | |
case AddClient(client) => | |
clients = client :: clients | |
case RemoveClient(client) => | |
clients = clients filter { c => c != client } | |
case Broadcast(msg) => | |
clients foreach { client => | |
client.broadcast(msg) | |
} | |
}} | |
} | |
} | |
//========================================================================= | |
class Client(val socket: Socket, val id: Int) extends DaemonActor { | |
private case class TcpLine(line: String) | |
private case class TcpSend(line: String) | |
private case class TcpRead() | |
import java.io._ | |
// Implicit methods tell the compiler how to turn the parameter's type | |
// into the return type. So, when you assign an InputStream to a | |
// val that is typed BufferedReader, this method is called. | |
private implicit def inputStreamWrapper(in: InputStream): BufferedReader = | |
new BufferedReader(new InputStreamReader(in)) | |
private implicit def outputStreamWrapper(out: OutputStream): PrintWriter = | |
new PrintWriter(new OutputStreamWriter(out)) | |
override def toString(): String = { | |
format("<client:%d>", id) | |
} | |
def broadcast(msg: String) { | |
this ! TcpSend(msg) | |
} | |
def close(reason: String) { | |
println(format(" :: closing client [%d] due to [%s].", id, reason)) | |
socket.close() ; | |
Router.remove(this) ; | |
} | |
private def read(in: BufferedReader) = { | |
val line = in.readLine() | |
line match { | |
case null => | |
throw new Exception("Client disconnected.") | |
case "quit" => | |
throw new Exception("Client quit.") | |
case _ => | |
this ! TcpLine(line) | |
} | |
} | |
private def write(out: PrintWriter, line: String) = { | |
val out: PrintWriter = socket.getOutputStream() | |
out.println(line) | |
out.flush() | |
} | |
override def start() = { | |
super.start() | |
this ! TcpRead | |
this | |
} | |
def act() { | |
val in: BufferedReader = socket.getInputStream() | |
val out: PrintWriter = socket.getOutputStream() | |
Router.add(this) | |
var keepGoing = true | |
loopWhile(keepGoing) { react { | |
case TcpRead => | |
read(in) | |
case TcpSend(line) => | |
write(out, line) | |
case TcpLine(line) => | |
val msg = format("client.%d> %s", id, line) ; | |
println(msg) ; | |
Router.broadcast(msg) ; | |
this ! TcpRead | |
}} | |
} | |
} | |
//========================================================================= | |
class ChatServer(val port: Int) extends DaemonActor { | |
private case class TcpAccept() | |
private case class TcpClose() | |
private case class NewClient(client: Client) | |
private var serverSocket: Option[ServerSocket] = None | |
override def start() = { | |
super.start() | |
this ! TcpAccept | |
this | |
} | |
def stop() { | |
this ! TcpClose | |
} | |
def act() { | |
trapExit = true | |
Router.start | |
serverSocket = Some(new ServerSocket(port)) | |
var clientId: Int = 0 | |
loop { react { | |
case NewClient(client) => | |
link(client) ; | |
this ! TcpAccept | |
case TcpAccept => | |
clientId += 1 ; | |
val id = clientId ; | |
val self = this ; | |
actor { | |
// Run inside an actor in order to avoid blocking | |
// this actor. We want to receive Exits from clients. | |
val clientSocket = serverSocket.get.accept() ; | |
val client = new Client(clientSocket, id) ; | |
client.start ; | |
self ! NewClient(client) | |
} | |
case TcpClose => | |
serverSocket match { | |
case None => ; | |
case Some(s) => s.close() | |
} | |
case Exit(client: Client, e: Exception) => | |
client.close(e.getCause().getMessage().toString) | |
}} | |
} | |
} | |
//========================================================================= | |
object Lock { | |
val lock: AnyRef = new Object() | |
def acquire() = { lock.synchronized { lock.wait() }} | |
def release() = { lock.synchronized { lock.notifyAll() }} | |
} | |
//========================================================================= | |
object Main { | |
class Supervisor extends DaemonActor { | |
// Keeps the chat server running | |
case class Stop() | |
def stop() = this ! Stop | |
def startUp(): Supervisor = { start ; this } | |
def act() { | |
trapExit = true | |
var keepGoing = true | |
var server = new ChatServer(9999) | |
server.start | |
link(server) | |
loopWhile (keepGoing) { react { | |
case Stop() => | |
server.stop ; | |
keepGoing = false | |
case Exit(from: Actor, e: Exception) => | |
println(" !! chatserver exited, reason: " + e.toString) ; | |
server.stop | |
server = new ChatServer(9999) | |
link(server) | |
server.start | |
}} | |
} | |
} | |
def main(args: Array[String]) { | |
println("Hello chat."); | |
println(" :: starting server") | |
Util.addShutdownHook { | |
println("\n :: shutdown hook stopping server") | |
Lock.release() | |
} | |
var runner = new Supervisor().startUp | |
Lock.acquire() | |
runner.stop | |
println(" :: done serving") | |
} | |
} | |
} | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Here's a new version of my little TCP chat server. This one uses actors a lot more with some Erlang-ish stuff sprinkled in. Rather than a client actor handling an exception (say when a client disconnects), it instead throws an exception. The ChatServer actor is linked to it, so it gets the exit message and can do the right thing, which in this case, is to close the socket.
I change the main loop to use a Supervisor actor which traps exits from the ChatServer and restarts it. Clients will have to reconnect, of course, but it at least attempts to keep the server running. I can imagine adding some checks to make sure that the starts haven't happend too often, and to put a time delay in there, etc, etc.
Code's a bit messy. I should move all the stuff that happens in the actor loop to functions so that the loops are very clean and readable. Nevertheless, I'm getting the idea of how these things work and how closely you can mimic a kind of Erlang interaction.
Neato.