Skip to content

Instantly share code, notes, and snippets.

@ericacm
Forked from viktorklang/minscalaactors.scala
Created August 10, 2012 15:41
Show Gist options
  • Save ericacm/3315119 to your computer and use it in GitHub Desktop.
Save ericacm/3315119 to your computer and use it in GitHub Desktop.
Minimalist Scala Actors
// ©2012 Viktor Klang
object MiniActor {
import java.util.concurrent.{ConcurrentLinkedQueue, Executor}
import java.util.concurrent.atomic.AtomicInteger
type Behavior = Any => Effect
sealed trait Effect extends (Behavior => Behavior)
case object Stay extends Effect { def apply(old: Behavior): Behavior = old }
case class Become(like: Behavior) extends Effect { def apply(old: Behavior): Behavior = like }
final val Die = Become(msg => { println("Dropping msg [" + msg + "] due to severe case of death."); Stay }) // Stay Dead plz
// The notion of an Address to where you can post messages to
trait Address { def !(msg: Any) }
private abstract class AtomicRunnableAddress extends Address with Runnable { val on = new AtomicInteger(0) }
// Seeded by the self-reference that yields the initial behavior
def apply(initial: Address => Behavior)(implicit e: Executor): Address =
// Memory visibility of "behavior" is guarded by "on" using volatile piggybacking
new AtomicRunnableAddress {
// Our awesome little mailbox, free of blocking and evil
private final val mbox = new ConcurrentLinkedQueue[Any]
// Rebindable top of the mailbox, bootstrapped to identity
private var behavior: Behavior = { case self: Address => Become(initial(self)) }
// As an optimization, we peek at our threads local copy of our behavior to see if we should bail out early
final override def !(msg: Any): Unit = behavior match {
case dead @ Die.`like` => dead(msg) // Efficiently bail out if we're _known_ to be dead
case _ => mbox.offer(msg); async() // Enqueue the message onto the mailbox and try to schedule for execution
}
// Switch ourselves off, and then see if we should be rescheduled for execution
final def run(): Unit = try {
if (on.get == 1) behavior = behavior(mbox.poll())(behavior)
} finally {
on.set(0); async()
}
// If there's something to process, and we're not already scheduled
final def async(): Unit = if(!mbox.isEmpty && on.compareAndSet(0, 1))
// Schedule to run on the Executor and back out on failure
try
e.execute(this)
catch {
case t: Throwable => on.set(0); throw t
}
} match {
// Make the actor self aware by seeding its address to the initial behavior
case a: Address => a ! a; a
}
}
//Usage
import MiniActor._
implicit val e: java.util.concurrent.Executor = java.util.concurrent.Executors.newCachedThreadPool
//Creates an actor that will, after it's first message is received, Die
val actor = MiniActor( self => msg => { println("self: " + self + " got msg " + msg); Die } )
actor ! "foo"
actor ! "foo"
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment