Created
March 16, 2014 11:35
-
-
Save RayRoestenburg/9581923 to your computer and use it in GitHub Desktop.
Are confirmed messages replayed?
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package akka.persistence | |
import com.typesafe.config._ | |
import scala.concurrent.duration._ | |
import akka.actor._ | |
import akka.persistence._ | |
import akka.testkit._ | |
case class SetNumber(number: Int) | |
case class Add(number: Int) | |
case class Subtract(number: Int) | |
case object DecrementAndGet | |
case object GetNumber | |
class NumberProcessor(name: String) extends NamedProcessor(name) { | |
override def processorId = name | |
var num = 0 | |
def receive = { | |
case Persistent(SetNumber(number), _) ⇒ num = number | |
case Persistent(Add(number), _) ⇒ num = num + number | |
case Persistent(Subtract(number), _) ⇒ num = num - number | |
case GetNumber ⇒ sender ! num | |
case Persistent(DecrementAndGet, _) ⇒ | |
num = num - 1 | |
sender ! num | |
case DecrementAndGet ⇒ | |
num = num - 1 | |
sender ! num | |
} | |
} | |
class NumberProcessorWithChannel(name: String) extends NamedProcessor(name) { | |
override def processorId = name | |
var num = 0 | |
val channel = context.actorOf(Channel.props(), name = "myChannel") | |
def receive = { | |
case Persistent(SetNumber(number), _) ⇒ num = number | |
case Persistent(Add(number), _) ⇒ num = num + number | |
case Persistent(Subtract(number), _) ⇒ num = num - number | |
case GetNumber ⇒ channel ! Deliver(Persistent(num), sender.path) | |
case p @ Persistent(DecrementAndGet, _) ⇒ | |
num = num - 1 | |
channel ! Deliver(p.withPayload(num), sender.path) | |
} | |
} | |
abstract class NumberProcessorSpec(config: Config) extends AkkaSpec(config) | |
with PersistenceSpec { | |
"A processor" must { | |
"resurrect with the correct state" in { | |
val probe = TestProbe() | |
val processor = namedProcessor[NumberProcessor] | |
processor.tell(GetNumber, probe.testActor) | |
probe.expectMsg(0) | |
processor.tell(Persistent(DecrementAndGet), probe.testActor) | |
probe.expectMsg(-1) | |
watch(processor) | |
system.stop(processor) | |
expectMsgType[Terminated] | |
val processorResurrected = namedProcessor[NumberProcessor] | |
val anotherProbe = TestProbe() | |
processorResurrected.tell(Persistent(DecrementAndGet), anotherProbe.testActor) | |
anotherProbe.expectMsg(-2) | |
} | |
"resurrect with the correct state, replaying state to previous clients" in { | |
val probe = TestProbe() | |
val processor = namedProcessor[NumberProcessor] | |
processor.tell(GetNumber, probe.testActor) | |
probe.expectMsg(0) | |
processor.tell(Persistent(DecrementAndGet), probe.testActor) | |
probe.expectMsg(-1) | |
watch(processor) | |
system.stop(processor) | |
expectMsgType[Terminated] | |
val processorResurrected = namedProcessor[NumberProcessor] | |
processorResurrected.tell(Persistent(DecrementAndGet), probe.testActor) | |
probe.expectMsg(-1) | |
probe.expectMsg(-2) | |
} | |
} | |
"A processor using a channel" must { | |
"resurrect with the correct state" in { | |
val deliveredProbe = TestProbe() | |
system.eventStream.subscribe(deliveredProbe.testActor, classOf[Delivered]) | |
val probe = TestProbe() | |
val processor = namedProcessor[NumberProcessorWithChannel] | |
processor.tell(GetNumber, probe.testActor) | |
val zero = probe.expectMsgType[ConfirmablePersistent] | |
zero.confirm() | |
zero.payload should equal(0) | |
deliveredProbe.expectMsgType[Delivered] | |
processor.tell(Persistent(DecrementAndGet), probe.testActor) | |
val decrementFrom0 = probe.expectMsgType[ConfirmablePersistent] | |
decrementFrom0.confirm() | |
decrementFrom0.payload should equal(-1) | |
deliveredProbe.expectMsgType[Delivered] | |
watch(processor) | |
system.stop(processor) | |
expectMsgType[Terminated] | |
val processorResurrected = namedProcessor[NumberProcessorWithChannel] | |
val anotherProbe = TestProbe() | |
processorResurrected.tell(Persistent(DecrementAndGet), anotherProbe.testActor) | |
val decrementFromMinus1 = anotherProbe.expectMsgType[ConfirmablePersistent] | |
decrementFromMinus1.confirm() | |
decrementFromMinus1.payload should equal(-2) | |
deliveredProbe.expectMsgType[Delivered] | |
} | |
"resurrect with the correct state, not replaying confirmed messages to clients?" in { | |
val deliveredProbe = TestProbe() | |
system.eventStream.subscribe(deliveredProbe.testActor, classOf[Delivered]) | |
val probe = TestProbe() | |
val processor = namedProcessor[NumberProcessorWithChannel] | |
processor.tell(GetNumber, probe.testActor) | |
val zero = probe.expectMsgType[ConfirmablePersistent] | |
zero.confirm() | |
zero.payload should equal(0) | |
deliveredProbe.expectMsgType[Delivered] | |
processor.tell(Persistent(DecrementAndGet), probe.testActor) | |
val decrementFrom0 = probe.expectMsgType[ConfirmablePersistent] | |
decrementFrom0.confirm() | |
decrementFrom0.payload should equal(-1) | |
deliveredProbe.expectMsgType[Delivered] | |
watch(processor) | |
system.stop(processor) | |
expectMsgType[Terminated] | |
val processorResurrected = namedProcessor[NumberProcessorWithChannel] | |
processorResurrected.tell(Persistent(DecrementAndGet), probe.testActor) | |
// I'm not expecting this to happen, since the message has already been confirmed? | |
val whyIsThisReceived = probe.expectMsgType[ConfirmablePersistent] | |
whyIsThisReceived.confirm() | |
whyIsThisReceived.payload should equal(-1) | |
deliveredProbe.expectMsgType[Delivered] | |
val decrementFromMinus1 = probe.expectMsgType[ConfirmablePersistent] | |
decrementFromMinus1.confirm() | |
decrementFromMinus1.payload should equal(-2) | |
deliveredProbe.expectMsgType[Delivered] | |
} | |
} | |
} | |
class NumberSpec extends NumberProcessorSpec(PersistenceSpec.config("leveldb", "NumberSpec")) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment