Skip to content

Instantly share code, notes, and snippets.

@tnhu
Forked from viktorklang/minscalaactors.scala
Created April 13, 2012 07:25
Show Gist options
  • Save tnhu/2374798 to your computer and use it in GitHub Desktop.
Save tnhu/2374798 to your computer and use it in GitHub Desktop.
Minimalist Scala Actors
©2012 Viktor Klang
object Actor {
import java.util.concurrent.{ConcurrentLinkedQueue, Executor}
import java.util.concurrent.atomic.{AtomicBoolean}
type Behavior = Any => Effect
sealed trait Effect { def getOrElse(old: Behavior): Behavior }
case object Stay extends Effect { def getOrElse(old: Behavior): Behavior = old }
case class Become(like: Behavior) extends Effect { def getOrElse(old: Behavior): Behavior = like }
final val Die = Become(msg => { println("Dropping msg [" + msg + "] for [" + this + "] due to severe case of death."); Stay }) // Stay Dead plz
trait Address { def !(msg: Any): Unit } // The notion of an Address to where you can post messages to
def apply(initial: Address => Behavior)(implicit e: Executor): Address = // Seeded by the self-reference that yields the initial behavior
new Address with Runnable {
private final val mbox = new ConcurrentLinkedQueue[Any] // Our awesome little mailbox, free of blocking and evil
private final val on = new AtomicBoolean(false) // A flag telling us whether we're currently enqueued "on" the Executor or not
private var behavior: Behavior = { case self: Address => Become(initial(self)) } // Rebindable top of the mailbox, bootstrapped to identity for open recursion
final override def !(msg: Any): Unit = behavior match { // As an optimization, we peek at our threads local copy of our behavior to see if we should bail out early
case dead @ Die.`like` => dead(msg) // Efficiently bail out if we're _known_ to be dead
case _ => mbox.offer(msg); trySchedule() // Enqueue the message onto the mailbox and try to schedule for execution
}
final def run(): Unit =
try { behavior = behavior(mbox.poll()) getOrElse behavior } finally { on.set(false); trySchedule() } // Switch ourselves off, and then see if we should be rescheduled for execution
final def trySchedule(): Unit = if(!mbox.isEmpty && on.compareAndSet(false, true)) // If there's something to process, and we're not already scheduled
try e.execute(this) catch { case t => on.set(false); throw t } // Schedule to run on the Executor and back out on failure
} match { case a: Address => a ! a; a } // Make the actor self aware by seeding its address to the initial behavior
}
//Usage
import Actor._
implicit val e: java.util.concurrent.Executor = java.util.concurrent.Executors.newCachedThreadPool
//Creates an actor that will, after it's first message is received, Die
val actor = Actor( self => msg => { println("self: " + self + " got msg " + msg); Die } )
actor ! "foo"
actor ! "foo"
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment