Skip to content

Instantly share code, notes, and snippets.

@RayRoestenburg
Created March 16, 2014 11:35
Show Gist options
  • Save RayRoestenburg/9581923 to your computer and use it in GitHub Desktop.
Save RayRoestenburg/9581923 to your computer and use it in GitHub Desktop.
Are confirmed messages replayed?
package akka.persistence
import com.typesafe.config._
import scala.concurrent.duration._
import akka.actor._
import akka.persistence._
import akka.testkit._
case class SetNumber(number: Int)
case class Add(number: Int)
case class Subtract(number: Int)
case object DecrementAndGet
case object GetNumber
class NumberProcessor(name: String) extends NamedProcessor(name) {
override def processorId = name
var num = 0
def receive = {
case Persistent(SetNumber(number), _) ⇒ num = number
case Persistent(Add(number), _) ⇒ num = num + number
case Persistent(Subtract(number), _) ⇒ num = num - number
case GetNumber ⇒ sender ! num
case Persistent(DecrementAndGet, _) ⇒
num = num - 1
sender ! num
case DecrementAndGet ⇒
num = num - 1
sender ! num
}
}
class NumberProcessorWithChannel(name: String) extends NamedProcessor(name) {
override def processorId = name
var num = 0
val channel = context.actorOf(Channel.props(), name = "myChannel")
def receive = {
case Persistent(SetNumber(number), _) ⇒ num = number
case Persistent(Add(number), _) ⇒ num = num + number
case Persistent(Subtract(number), _) ⇒ num = num - number
case GetNumber ⇒ channel ! Deliver(Persistent(num), sender.path)
case p @ Persistent(DecrementAndGet, _) ⇒
num = num - 1
channel ! Deliver(p.withPayload(num), sender.path)
}
}
abstract class NumberProcessorSpec(config: Config) extends AkkaSpec(config)
with PersistenceSpec {
"A processor" must {
"resurrect with the correct state" in {
val probe = TestProbe()
val processor = namedProcessor[NumberProcessor]
processor.tell(GetNumber, probe.testActor)
probe.expectMsg(0)
processor.tell(Persistent(DecrementAndGet), probe.testActor)
probe.expectMsg(-1)
watch(processor)
system.stop(processor)
expectMsgType[Terminated]
val processorResurrected = namedProcessor[NumberProcessor]
val anotherProbe = TestProbe()
processorResurrected.tell(Persistent(DecrementAndGet), anotherProbe.testActor)
anotherProbe.expectMsg(-2)
}
"resurrect with the correct state, replaying state to previous clients" in {
val probe = TestProbe()
val processor = namedProcessor[NumberProcessor]
processor.tell(GetNumber, probe.testActor)
probe.expectMsg(0)
processor.tell(Persistent(DecrementAndGet), probe.testActor)
probe.expectMsg(-1)
watch(processor)
system.stop(processor)
expectMsgType[Terminated]
val processorResurrected = namedProcessor[NumberProcessor]
processorResurrected.tell(Persistent(DecrementAndGet), probe.testActor)
probe.expectMsg(-1)
probe.expectMsg(-2)
}
}
"A processor using a channel" must {
"resurrect with the correct state" in {
val deliveredProbe = TestProbe()
system.eventStream.subscribe(deliveredProbe.testActor, classOf[Delivered])
val probe = TestProbe()
val processor = namedProcessor[NumberProcessorWithChannel]
processor.tell(GetNumber, probe.testActor)
val zero = probe.expectMsgType[ConfirmablePersistent]
zero.confirm()
zero.payload should equal(0)
deliveredProbe.expectMsgType[Delivered]
processor.tell(Persistent(DecrementAndGet), probe.testActor)
val decrementFrom0 = probe.expectMsgType[ConfirmablePersistent]
decrementFrom0.confirm()
decrementFrom0.payload should equal(-1)
deliveredProbe.expectMsgType[Delivered]
watch(processor)
system.stop(processor)
expectMsgType[Terminated]
val processorResurrected = namedProcessor[NumberProcessorWithChannel]
val anotherProbe = TestProbe()
processorResurrected.tell(Persistent(DecrementAndGet), anotherProbe.testActor)
val decrementFromMinus1 = anotherProbe.expectMsgType[ConfirmablePersistent]
decrementFromMinus1.confirm()
decrementFromMinus1.payload should equal(-2)
deliveredProbe.expectMsgType[Delivered]
}
"resurrect with the correct state, not replaying confirmed messages to clients?" in {
val deliveredProbe = TestProbe()
system.eventStream.subscribe(deliveredProbe.testActor, classOf[Delivered])
val probe = TestProbe()
val processor = namedProcessor[NumberProcessorWithChannel]
processor.tell(GetNumber, probe.testActor)
val zero = probe.expectMsgType[ConfirmablePersistent]
zero.confirm()
zero.payload should equal(0)
deliveredProbe.expectMsgType[Delivered]
processor.tell(Persistent(DecrementAndGet), probe.testActor)
val decrementFrom0 = probe.expectMsgType[ConfirmablePersistent]
decrementFrom0.confirm()
decrementFrom0.payload should equal(-1)
deliveredProbe.expectMsgType[Delivered]
watch(processor)
system.stop(processor)
expectMsgType[Terminated]
val processorResurrected = namedProcessor[NumberProcessorWithChannel]
processorResurrected.tell(Persistent(DecrementAndGet), probe.testActor)
// I'm not expecting this to happen, since the message has already been confirmed?
val whyIsThisReceived = probe.expectMsgType[ConfirmablePersistent]
whyIsThisReceived.confirm()
whyIsThisReceived.payload should equal(-1)
deliveredProbe.expectMsgType[Delivered]
val decrementFromMinus1 = probe.expectMsgType[ConfirmablePersistent]
decrementFromMinus1.confirm()
decrementFromMinus1.payload should equal(-2)
deliveredProbe.expectMsgType[Delivered]
}
}
}
class NumberSpec extends NumberProcessorSpec(PersistenceSpec.config("leveldb", "NumberSpec"))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment