-
-
Save zentrope/541810 to your computer and use it in GitHub Desktop.
package com.zentrope { | |
import scala.actors.Actor | |
import scala.actors.TIMEOUT | |
import scala.concurrent.Lock | |
import scala.concurrent.ops.spawn | |
import scala.util.Random | |
object Util { | |
def nameOf(thing: AnyRef): String = | |
thing.getClass().getSimpleName() | |
def addShutdownHook(body: => Unit) = | |
Runtime.getRuntime.addShutdownHook(new Thread { | |
override def run { body } | |
}) | |
} | |
// ---------------------------------------------------------------------- | |
trait Server extends Actor { | |
// Represents a server that the Kernel can manage. I can see this | |
// turning into a gen_server like thing. | |
case object StopIt | |
def startUp(): Unit = | |
this.start | |
def shutDown(): Unit = | |
this ! StopIt | |
// Could implement act() right here, then delegate to | |
// some sort of callback system which pattern matches on | |
// message type. | |
} | |
// ---------------------------------------------------------------------- | |
object Level extends Enumeration { | |
type Level = Value | |
val INFO = Value("INFO") | |
val WARN = Value("WARN") | |
val ERROR = Value("ERROR") | |
} | |
object LogServer extends Server { | |
// Logs all messages it receives to the console. An example of an | |
// actor that receives messages from other actors and then | |
// communicates them to the rest of the world. | |
import Level._ | |
private case class LOG_MSG(level: Level, from: AnyRef, msg: String) | |
private def printLog(level: Level, from: AnyRef, msg: String) { | |
val fmt = "%s: [%s] %s" | |
val out = String.format(fmt, level, Util.nameOf(from), msg) | |
println(out) | |
} | |
def log(level: Level, from: AnyRef, msg: String) = | |
LogServer ! LOG_MSG(level, from, msg) | |
def act() { | |
loop { | |
receive { | |
case LOG_MSG(level, from, msg) => | |
printLog(level,from, msg) | |
// case LOG_INFO(from, msg) => log("INFO", from, msg) | |
// case LOG_WARN(from, msg) => log("WARN", from, msg) | |
case StopIt => | |
printLog(Level.INFO, this, "Asked to stop."); | |
exit | |
} | |
} | |
} | |
} | |
trait Logging { | |
import Level._ | |
// These are partial functions that can be mixed in to an actor class | |
// to make the code easier to read. | |
private def log = LogServer.log _ | |
def info = log(Level.INFO, this, _: String ) | |
def warn = log(Level.WARN, this, _: String ) | |
def error = log(Level.ERROR, this, _: String ) | |
} | |
// ---------------------------------------------------------------------- | |
object RestServer extends Server with Logging { | |
import org.restlet._ | |
import org.restlet.data._ | |
import org.restlet.representation._ | |
import org.restlet.resource._ | |
import org.restlet.routing._ | |
class Hello extends ServerResource { | |
@Get("html") | |
def helloThere(): Representation = | |
new StringRepresentation("<h1>Hello!</h1>", MediaType.TEXT_HTML) | |
} | |
class RestApplication extends Application { | |
override def createInboundRoot(): Restlet = { | |
val router: Router = new Router(getContext()) | |
router.attach("/scala", classOf[Hello]) | |
router | |
} | |
} | |
case object StartRest | |
override def startUp = { | |
super.startUp() | |
this ! StartRest | |
} | |
def act() { | |
val component: Component = new Component() | |
loop { receive { | |
case StartRest => { | |
try { | |
component.getServers().add(Protocol.HTTP, 8080) | |
component.getDefaultHost().attach(new RestApplication()) | |
component.getLogService().setEnabled(false) | |
component.start() | |
} | |
catch { | |
case th: Throwable => { | |
error(th.toString); | |
} | |
} | |
} | |
case StopIt => { | |
info("Stopping rest service.") ; | |
component.stop() ; | |
info("Asked to stop."); | |
exit | |
} | |
}} | |
} | |
} | |
// ---------------------------------------------------------------------- | |
object SysEventServer extends Server with Logging { | |
// Sends events to whoever is interested in getting them. | |
// Actors interested in SysEvent messages can import these | |
// case classes. | |
case class SysEvent(kind: String, note: String) | |
// Private case calsses are for sending internal messages. | |
private case class AddActor(actor: Actor) | |
def add(actor: Actor): Unit = | |
// Note that this method sends a message rather than updating | |
// a data structure. | |
this ! AddActor(actor) | |
def act() { | |
// We keep the state local to the loop to emphasize that all | |
// methods on this object should result in a message send. | |
// Otherwise, our concurrency is lost. | |
var actors: List[Actor] = List() | |
loop { | |
// Simulate that we're getting events from outside the app, | |
// say, a web hit, or a socket connection, or a request | |
// to do something interesting. | |
val to = (10 + Random.nextInt(10)) * 1000 | |
receiveWithin (to) { | |
case AddActor(actor) => | |
actors = actor :: actors | |
case TIMEOUT => | |
info("sys event generated") ; | |
actors foreach (h => | |
h ! SysEvent("FOO", "system event " + to)) | |
case StopIt => | |
info("Asked to stop.") ; | |
exit | |
} | |
} | |
} | |
} | |
// ---------------------------------------------------------------------- | |
object WarningServer extends Server with Logging { | |
import SysEventServer.SysEvent | |
override def startUp() = | |
super.startUp() ; | |
SysEventServer.add(this) | |
def act() { | |
loop { receiveWithin (10000) { | |
case TIMEOUT => warn("Warning! Um.... yeah!") | |
case SysEvent(k, n) => | |
warn("SYSEVENT: " + k + ", " + n) | |
case StopIt => info("Asked to stop."); exit | |
}} | |
} | |
} | |
// ---------------------------------------------------------------------- | |
object PresenceServer extends Server with Logging { | |
import SysEventServer.SysEvent | |
var timeout: Int = 3000 // millis | |
override def startUp() = | |
super.startUp() ; | |
SysEventServer.add(this) | |
def act() { | |
var place = 1 | |
loop { receiveWithin(timeout) { | |
case SysEvent(k, n) => | |
warn("SYSEVENT: " + k + ", " + n) | |
case TIMEOUT => | |
info("sending presence " + place) ; | |
place = place + 1 | |
case StopIt => | |
info("Asked to stop.") | |
}} | |
} | |
} | |
// ---------------------------------------------------------------------- | |
object Kernel { | |
// Start up a process, start Actors, then | |
// wait for a signal to shut down (i.e., ^C). | |
// Note: This should be an Actor itself so it can get notified | |
// via the "link" mechanism when one of the Servers dies so it | |
// can then restart it. | |
val lock: Lock = new Lock() | |
val servers: List[Server] = List( | |
LogServer, | |
PresenceServer, | |
WarningServer, | |
SysEventServer, | |
RestServer | |
) | |
def notifier() = { | |
println("\n") | |
servers.reverse.foreach (s => spawn { | |
println("stopping " + Util.nameOf(s)) | |
s.shutDown | |
}) | |
Thread.sleep(2000) | |
lock.release | |
} | |
def main(args: Array[String]) = { | |
Util.addShutdownHook { notifier() } | |
println("starting actors") | |
servers.foreach (s => spawn { | |
println("starting " + Util.nameOf(s)) | |
s.startUp | |
}) | |
println ("running ...") | |
lock.acquire | |
} | |
} | |
} |
Okay, added a lot more to the basic idea. Lots of different kinds of actors, a Server trait, and so on. Just exploring the possibilities. I already see the opportunity for some sort of Erlang OTP like behaviors, though I'm not sure how to get around the static typing. Probably the Server trait would receive messages of type Any, the delegate them to the callback class which does pattern matching. If there's a failure, the Server calls an "unknownMsg" callback to deal with it, or something like that.
Anyway, the one bad thing about Scala and Actors is that you really have to maintain discipline to make sure that your Actor objects' own methods send messages and don't update any state on their own.
And...
I've added a third version. This one extends the logging concern a bit with some enumerations, a Trait you can add to an object, etc, etc. The logging stuff is a bit over done mainly so I could discover the readability value of partial functions.
I'm warming a bit to Scala's improvements of the OO stuff on the JVM.
Also, I added an Actor to manage a web server based on my favorite JVM REST library, Restlets. This requires a ton of jars to work, but you know, that's how it is. I'm surprised at how easily it was to integrate with Java libs. Even the annotation worked.
Next up: See what it's like to integrate HornetQ. Once that's done, I guess I don't see why this couldn't work as an application shell (or really, pattern, or style) for lots of uses.
Just playing around with Scala for a bit. I like to write apps that start up, spin up a few actors, then wait. Each actor might do something interesting, I figure.
I can imagine starting up regular threadish things that, say, receive messages via an MQ or a Web thing, and then send a message to an actor to actually do something.
I sorta need a real use case to see how it works.