Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
Handling an actor's idleness in Akka
package akka.actor
import scala.concurrent.duration._
/**
* <p>Thaasophobia is a fear of being idle, sitting.
* <p>WARNING: A thaasophobic actor with default behaviour could stop before
* concurrent operations complete:
* <ul>
* <li>Don't use future callbacks inside the actor
* <li>This actor should not expect to receive replies/ack messages
* </ul>
*/
trait Thaasophobia extends Actor {
def idleTimeout: Duration
override protected[akka] def aroundReceive(receive: Receive, msg: Any): Unit =
msg match {
case ReceiveTimeout =>
handleIdleness()
case _ =>
super.aroundReceive(receive, msg)
}
def handleIdleness(): Unit = context.stop(self)
override def preStart(): Unit = context.setReceiveTimeout(idleTimeout)
}
package akka.actor
import akka.testkit.TestActors.EchoActor
import akka.testkit.{ImplicitSender, TestActorRef}
import me.crowdmix.traffic.actors.ActorSpec
import scala.concurrent.duration._
import scala.concurrent.{ExecutionContext, Future}
class ThaasophobiaSpec extends ActorSpec("thaasophobia") with ImplicitSender {
"Thaasophobic actor" should {
"behave normally" in new Context {
thaasophobicActor ! Msg
expectMsg(Msg)
}
"stay alive" in new Context {
watch(thaasophobicActor)
import ExecutionContext.Implicits.global
system.scheduler.schedule(0.millis, _idleTimeout / 2, thaasophobicActor, Ignore)
expectNoMsg(1.second)
}
"stop" in new Context {
watch(thaasophobicActor)
expectTerminated(thaasophobicActor)
}
"stop before future completes" in new Context {
watch(thaasophobicActor)
thaasophobicActor ! Sleep((_idleTimeout * 1.2).toMillis)
expectTerminated(thaasophobicActor)
}
}
trait Context {
case object Msg
case object Ignore
case class Sleep(millis: Long)
case object Awake
val _idleTimeout = 100.millis
val thaasophobicActor =
TestActorRef(new EchoActor with Thaasophobia {
import ExecutionContext.Implicits.global
override val idleTimeout = _idleTimeout
override val receive: Receive = {
case Ignore =>
case Sleep(millis) =>
val _sender = sender()
Future {
Thread.sleep(millis)
_sender
}.onSuccess { case ref => ref ! Awake }
case msg =>
super.receive(msg)
}
})
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.