Skip to content

Instantly share code, notes, and snippets.

@zentrope
Created August 21, 2010 05:19
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save zentrope/541810 to your computer and use it in GitHub Desktop.
Save zentrope/541810 to your computer and use it in GitHub Desktop.
package com.zentrope {
import scala.actors.Actor
import scala.actors.TIMEOUT
import scala.concurrent.Lock
import scala.concurrent.ops.spawn
import scala.util.Random
object Util {
def nameOf(thing: AnyRef): String =
thing.getClass().getSimpleName()
def addShutdownHook(body: => Unit) =
Runtime.getRuntime.addShutdownHook(new Thread {
override def run { body }
})
}
// ----------------------------------------------------------------------
trait Server extends Actor {
// Represents a server that the Kernel can manage. I can see this
// turning into a gen_server like thing.
case object StopIt
def startUp(): Unit =
this.start
def shutDown(): Unit =
this ! StopIt
// Could implement act() right here, then delegate to
// some sort of callback system which pattern matches on
// message type.
}
// ----------------------------------------------------------------------
object Level extends Enumeration {
type Level = Value
val INFO = Value("INFO")
val WARN = Value("WARN")
val ERROR = Value("ERROR")
}
object LogServer extends Server {
// Logs all messages it receives to the console. An example of an
// actor that receives messages from other actors and then
// communicates them to the rest of the world.
import Level._
private case class LOG_MSG(level: Level, from: AnyRef, msg: String)
private def printLog(level: Level, from: AnyRef, msg: String) {
val fmt = "%s: [%s] %s"
val out = String.format(fmt, level, Util.nameOf(from), msg)
println(out)
}
def log(level: Level, from: AnyRef, msg: String) =
LogServer ! LOG_MSG(level, from, msg)
def act() {
loop {
receive {
case LOG_MSG(level, from, msg) =>
printLog(level,from, msg)
// case LOG_INFO(from, msg) => log("INFO", from, msg)
// case LOG_WARN(from, msg) => log("WARN", from, msg)
case StopIt =>
printLog(Level.INFO, this, "Asked to stop.");
exit
}
}
}
}
trait Logging {
import Level._
// These are partial functions that can be mixed in to an actor class
// to make the code easier to read.
private def log = LogServer.log _
def info = log(Level.INFO, this, _: String )
def warn = log(Level.WARN, this, _: String )
def error = log(Level.ERROR, this, _: String )
}
// ----------------------------------------------------------------------
object RestServer extends Server with Logging {
import org.restlet._
import org.restlet.data._
import org.restlet.representation._
import org.restlet.resource._
import org.restlet.routing._
class Hello extends ServerResource {
@Get("html")
def helloThere(): Representation =
new StringRepresentation("<h1>Hello!</h1>", MediaType.TEXT_HTML)
}
class RestApplication extends Application {
override def createInboundRoot(): Restlet = {
val router: Router = new Router(getContext())
router.attach("/scala", classOf[Hello])
router
}
}
case object StartRest
override def startUp = {
super.startUp()
this ! StartRest
}
def act() {
val component: Component = new Component()
loop { receive {
case StartRest => {
try {
component.getServers().add(Protocol.HTTP, 8080)
component.getDefaultHost().attach(new RestApplication())
component.getLogService().setEnabled(false)
component.start()
}
catch {
case th: Throwable => {
error(th.toString);
}
}
}
case StopIt => {
info("Stopping rest service.") ;
component.stop() ;
info("Asked to stop.");
exit
}
}}
}
}
// ----------------------------------------------------------------------
object SysEventServer extends Server with Logging {
// Sends events to whoever is interested in getting them.
// Actors interested in SysEvent messages can import these
// case classes.
case class SysEvent(kind: String, note: String)
// Private case calsses are for sending internal messages.
private case class AddActor(actor: Actor)
def add(actor: Actor): Unit =
// Note that this method sends a message rather than updating
// a data structure.
this ! AddActor(actor)
def act() {
// We keep the state local to the loop to emphasize that all
// methods on this object should result in a message send.
// Otherwise, our concurrency is lost.
var actors: List[Actor] = List()
loop {
// Simulate that we're getting events from outside the app,
// say, a web hit, or a socket connection, or a request
// to do something interesting.
val to = (10 + Random.nextInt(10)) * 1000
receiveWithin (to) {
case AddActor(actor) =>
actors = actor :: actors
case TIMEOUT =>
info("sys event generated") ;
actors foreach (h =>
h ! SysEvent("FOO", "system event " + to))
case StopIt =>
info("Asked to stop.") ;
exit
}
}
}
}
// ----------------------------------------------------------------------
object WarningServer extends Server with Logging {
import SysEventServer.SysEvent
override def startUp() =
super.startUp() ;
SysEventServer.add(this)
def act() {
loop { receiveWithin (10000) {
case TIMEOUT => warn("Warning! Um.... yeah!")
case SysEvent(k, n) =>
warn("SYSEVENT: " + k + ", " + n)
case StopIt => info("Asked to stop."); exit
}}
}
}
// ----------------------------------------------------------------------
object PresenceServer extends Server with Logging {
import SysEventServer.SysEvent
var timeout: Int = 3000 // millis
override def startUp() =
super.startUp() ;
SysEventServer.add(this)
def act() {
var place = 1
loop { receiveWithin(timeout) {
case SysEvent(k, n) =>
warn("SYSEVENT: " + k + ", " + n)
case TIMEOUT =>
info("sending presence " + place) ;
place = place + 1
case StopIt =>
info("Asked to stop.")
}}
}
}
// ----------------------------------------------------------------------
object Kernel {
// Start up a process, start Actors, then
// wait for a signal to shut down (i.e., ^C).
// Note: This should be an Actor itself so it can get notified
// via the "link" mechanism when one of the Servers dies so it
// can then restart it.
val lock: Lock = new Lock()
val servers: List[Server] = List(
LogServer,
PresenceServer,
WarningServer,
SysEventServer,
RestServer
)
def notifier() = {
println("\n")
servers.reverse.foreach (s => spawn {
println("stopping " + Util.nameOf(s))
s.shutDown
})
Thread.sleep(2000)
lock.release
}
def main(args: Array[String]) = {
Util.addShutdownHook { notifier() }
println("starting actors")
servers.foreach (s => spawn {
println("starting " + Util.nameOf(s))
s.startUp
})
println ("running ...")
lock.acquire
}
}
}
@zentrope
Copy link
Author

Just playing around with Scala for a bit. I like to write apps that start up, spin up a few actors, then wait. Each actor might do something interesting, I figure.

I can imagine starting up regular threadish things that, say, receive messages via an MQ or a Web thing, and then send a message to an actor to actually do something.

I sorta need a real use case to see how it works.

@zentrope
Copy link
Author

Okay, added a lot more to the basic idea. Lots of different kinds of actors, a Server trait, and so on. Just exploring the possibilities. I already see the opportunity for some sort of Erlang OTP like behaviors, though I'm not sure how to get around the static typing. Probably the Server trait would receive messages of type Any, the delegate them to the callback class which does pattern matching. If there's a failure, the Server calls an "unknownMsg" callback to deal with it, or something like that.

Anyway, the one bad thing about Scala and Actors is that you really have to maintain discipline to make sure that your Actor objects' own methods send messages and don't update any state on their own.

@zentrope
Copy link
Author

And...

I've added a third version. This one extends the logging concern a bit with some enumerations, a Trait you can add to an object, etc, etc. The logging stuff is a bit over done mainly so I could discover the readability value of partial functions.

I'm warming a bit to Scala's improvements of the OO stuff on the JVM.

Also, I added an Actor to manage a web server based on my favorite JVM REST library, Restlets. This requires a ton of jars to work, but you know, that's how it is. I'm surprised at how easily it was to integrate with Java libs. Even the annotation worked.

Next up: See what it's like to integrate HornetQ. Once that's done, I guess I don't see why this couldn't work as an application shell (or really, pattern, or style) for lots of uses.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment