Skip to content

Instantly share code, notes, and snippets.

@zentrope
Created October 19, 2010 04:22
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save zentrope/633599 to your computer and use it in GitHub Desktop.
Save zentrope/633599 to your computer and use it in GitHub Desktop.
Example of having one actor create other actors to manage external resources dynamically.
// A sample app demonstrating one way to use actors to manage other
// actors created dynamically. Use case: an app that needs to maintain
// short-lived connections to remote resources (like a messaging queue).
import scala.actors._
import scala.actors.Actor._
class DelegateActor(val id: String) extends DaemonActor {
// Represents an actor that does some work, or hangs on to
// an external resource, such as a socket connection to an MQ
// service. All it does is delegate incoming messages to the
// resource (socket), and timeout if there is no activity (and
// as a result, clean up the resource).
private case class HandleMessage(msg: String)
private val timeout = 100 * (scala.math.random * 1000).toInt
override def toString(): String = this.getClass.getSimpleName + "." + id
def handle(msg: String) = this ! HandleMessage(msg)
def act() {
loop { reactWithin(timeout) {
case HandleMessage(msg) =>
// Imagine we're writing to a socket.
println(format("[%s] got a message: [%s].", this, msg))
case TIMEOUT => {
println("self terminating " + this)
exit('timedout)
}
}}
}
}
object Controller extends DaemonActor {
// Starts up actors as needed, monitors their exits, keeps a list of their
// identities. When a user of this actor wants to send a message, it's
// this actor's responsibility to delegate a message to an actor, creating
// one if it doesn't already exist.
private case class SendMessage(id: String, msg: String)
override def toString(): String = this.getClass.getSimpleName
def send(id: String, msg: String) = this ! SendMessage(id, msg)
def act() {
trapExit = true // So that we get notified when delegates timeout
// The "state" managed by this actor: a map of delegates and their
// ids.
var delegates = Map[String, DelegateActor]()
// A local function enabling us to update the state managed by the
// actor and keep the react loop relatively uncluttered.
def mkDelegate(id: String): DelegateActor = {
// Returns a new delegate for ID linked to this and adds
// it to our local state.
val delegate = new DelegateActor(id)
delegates = delegates + ((id, delegate))
actor {
delegate.start
}
link(delegate)
delegate
}
// Loop forever.
loop { reactWithin(5000) {
case TIMEOUT => {
// Just to make sure this actor is still running and so we
// can do any cleanup or whatever.
println("pulse on " + this + ", delegates = " + delegates.keys)
}
case Exit(actor: DelegateActor, reason: AnyRef) => {
// When a delegate times out, we get a notification so we
// can remove it from our catalog of existing delegates.
println(format("actor [%s] closed due to [%s]", actor.id, reason))
delegates = delegates filterNot { case (k, v) => k == actor.id }
println("still tracking: " + delegates.keys)
}
case SendMessage(id, message) => {
// Send a message to a delegate, creating a new one if we
// can't find it in our catalog
delegates.getOrElse(id, mkDelegate(id)).handle(message)
}
}}
}
}
object Lock {
val lock: AnyRef = new Object()
def acquire { lock.synchronized { lock.wait() }}
def release = { lock.synchronized { lock.notifyAll } }
}
object Delegate {
def main(arg: Array[String]) {
println("Hello.")
Runtime.getRuntime.addShutdownHook(
new Thread {
override def run() {
Lock.release
}
}
)
Controller.start
Controller.send("banana", "a peel")
Controller.send("sophocles", "an idea")
Controller.send("zebra", "some grass")
1.to(33) foreach { i =>
Controller.send(i.toString(), i.toString() + " message")
}
Lock.acquire
}
}
@zentrope
Copy link
Author

This is a fun bit of Scala + Actors to manage the case where you want to have numerous persistent connections to a resource (database, message queue, etc), but want them to time out.

And you want to use them easily, something like:

Resource.doSomething(resourceName, payload) 

The "Resource" module is responsible for creating a new Actor to handle the payload, or delegating to an already existing one.

It's kind of vague because I was after the pattern more than the specific use case. I'm still amazed at how "easy" Actors makes things that used to seem hard. Easy and safe. You could do something similar using Java's thread primitives but, at least for me, I'd never be totally sure that it didn't have race conditions or bottlenecks. With Actors, the code reads like it behaves (which can't be overestimated).

The bottleneck in the above is in the Controller, which delegates to Delegates one message at a time. I think it can do this so quickly that it's not worth trying to best it. But if there was a problem, then I think the way to solve it is to have each session that needs the resource have a direct reference to the delegate, thus bypassing the Controller after an initial bootstrap. If the code that needed the resource was itself an actor, you could pass in a reference to the actor that needs the resource and this set up a direct two-way communication.

(The reason I didn't write it this way initially is that I'm thinking about this in the context of a stateless web service, rather than one which maintains some sort of session state. If I go with session state, each session could be an Actor, and could then have a link to a resource.)

At any rate, for some reason, this bit of code (for good or ill) seems a lot easier to reason about than other such code I've written.

All right, then. Um, uh: over and out.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment