Skip to content

Instantly share code, notes, and snippets.

@knutwalker
Last active October 21, 2015 08:56
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save knutwalker/9ec560c6c123b4c8e5f6 to your computer and use it in GitHub Desktop.
Save knutwalker/9ec560c6c123b4c8e5f6 to your computer and use it in GitHub Desktop.
Demonstrate how to break actor encapsulation with akka typed
#!/usr/bin/env scalas
// Run with conscript: http://www.scala-sbt.org/0.13/docs/Scripts.html
// Example run:
//
// $ ./typed-breakage.scala
// [state-1]: count = 21776 (should be 10000)
// [state-3]: count = 28473 (should be 10000)
// [state-2]: count = 29454 (should be 10000)
/***
scalaVersion := "2.11.7"
libraryDependencies += "com.typesafe.akka" %% "akka-typed-experimental" % "2.4.0"
*/
import akka.typed.ScalaDSL._
import akka.typed.{ ActorRef, ActorSystem, Behavior, Props, Terminated }
import akka.util.Timeout
import scala.concurrent.duration._
final val limit = 10000
sealed trait StateMessage extends Any with Product with Serializable
case object Increment extends StateMessage
case class Finish(label: String) extends StateMessage
final class SomeState {
private[this] var count = 0
val behavior: Behavior[StateMessage] = Total {
case Increment ⇒
count += 1
Same
case Finish(label) ⇒
println(s"[$label]: count = $count (should be $limit)")
Stopped
}
}
case object Run
val state = new SomeState
val guardian = ContextAware[Run.type] { ctx ⇒
val ref1 = ctx.watch(ctx.spawn(Props(state.behavior), "state-1"))
// problem is here, just spawn multiple actors that all encapsulate
// the same state will make the state shared and suddenly you have
// shared mutable state.
val ref2 = ctx.watch(ctx.spawn(Props(state.behavior), "state-2"))
val ref3 = ctx.watch(ctx.spawn(Props(state.behavior), "state-3"))
val allRefs = Set(ref1, ref2, ref3)
def stopping(refs: Set[ActorRef[_]]): Behavior[Run.type] = Full[Run.type] {
case Sig(_, Terminated(ref)) ⇒
val newRefs = refs - ref
if (newRefs.isEmpty) Stopped
else stopping(newRefs)
}
Total[Run.type] { run ⇒
for (i ← 1 to limit; ref ← allRefs) {
ref ! Increment
}
for (ref ← allRefs) {
ref ! Finish(ref.path.name)
}
stopping(allRefs.toSet)
}
}
val system = ActorSystem("Expose", Props(guardian))
implicit val timeout = Timeout(1.minute)
system ! Run
scala.concurrent.Await.result(system.whenTerminated, Duration.Inf)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment