Last active
August 29, 2015 13:57
-
-
Save RayRoestenburg/9582319 to your computer and use it in GitHub Desktop.
Confirmed messages are not replayed if the same channel is used.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package akka.persistence | |
import com.typesafe.config._ | |
import scala.concurrent.duration._ | |
import akka.actor._ | |
import akka.persistence._ | |
import akka.testkit._ | |
case class SetNumber(number: Int) | |
case class Add(number: Int) | |
case class Subtract(number: Int) | |
case object DecrementAndGet | |
case object GetNumber | |
class NumberProcessor(name: String) extends NamedProcessor(name) { | |
override def processorId = name | |
var num = 0 | |
def receive = { | |
case Persistent(SetNumber(number), _) ⇒ num = number | |
case Persistent(Add(number), _) ⇒ num = num + number | |
case Persistent(Subtract(number), _) ⇒ num = num - number | |
case GetNumber ⇒ sender ! num | |
case Persistent(DecrementAndGet, _) ⇒ | |
num = num - 1 | |
sender ! num | |
case DecrementAndGet ⇒ | |
num = num - 1 | |
sender ! num | |
} | |
} | |
class NumberProcessorWithChannel(name: String, channel: ActorRef) extends NamedProcessor(name) { | |
override def processorId = name | |
var num = 0 | |
def receive = { | |
case Persistent(SetNumber(number), _) ⇒ num = number | |
case Persistent(Add(number), _) ⇒ num = num + number | |
case Persistent(Subtract(number), _) ⇒ num = num - number | |
case GetNumber ⇒ channel ! Deliver(Persistent(num), sender.path) | |
case p @ Persistent(DecrementAndGet, _) ⇒ | |
num = num - 1 | |
channel ! Deliver(p.withPayload(num), sender.path) | |
} | |
} | |
abstract class NumberProcessorSpec(config: Config) extends AkkaSpec(config) | |
with PersistenceSpec { | |
"A processor" must { | |
"resurrect with the correct state" in { | |
val probe = TestProbe() | |
val processor = namedProcessor[NumberProcessor] | |
processor.tell(GetNumber, probe.testActor) | |
probe.expectMsg(0) | |
processor.tell(Persistent(DecrementAndGet), probe.testActor) | |
probe.expectMsg(-1) | |
watch(processor) | |
system.stop(processor) | |
expectMsgType[Terminated] | |
val processorResurrected = namedProcessor[NumberProcessor] | |
val anotherProbe = TestProbe() | |
processorResurrected.tell(Persistent(DecrementAndGet), anotherProbe.testActor) | |
anotherProbe.expectMsg(-2) | |
} | |
"resurrect with the correct state, replaying state to previous clients" in { | |
val probe = TestProbe() | |
val processor = namedProcessor[NumberProcessor] | |
processor.tell(GetNumber, probe.testActor) | |
probe.expectMsg(0) | |
processor.tell(Persistent(DecrementAndGet), probe.testActor) | |
probe.expectMsg(-1) | |
watch(processor) | |
system.stop(processor) | |
expectMsgType[Terminated] | |
val processorResurrected = namedProcessor[NumberProcessor] | |
processorResurrected.tell(Persistent(DecrementAndGet), probe.testActor) | |
probe.expectMsg(-1) | |
probe.expectMsg(-2) | |
} | |
} | |
"A processor using a channel" must { | |
"resurrect with the correct state, not replaying confirmed messages to clients, if the same channel is used" in { | |
val deliveredProbe = TestProbe() | |
system.eventStream.subscribe(deliveredProbe.testActor, classOf[Delivered]) | |
val probe = TestProbe() | |
val channel = system.actorOf(Channel.props(), name = "myChannel") | |
val processor = system.actorOf(Props(classOf[NumberProcessorWithChannel], name, channel)) | |
processor.tell(GetNumber, probe.testActor) | |
val zero = probe.expectMsgType[ConfirmablePersistent] | |
zero.confirm() | |
zero.payload should equal(0) | |
deliveredProbe.expectMsgType[Delivered] | |
processor.tell(Persistent(DecrementAndGet), probe.testActor) | |
val decrementFrom0 = probe.expectMsgType[ConfirmablePersistent] | |
decrementFrom0.confirm() | |
decrementFrom0.payload should equal(-1) | |
deliveredProbe.expectMsgType[Delivered] | |
watch(processor) | |
system.stop(processor) | |
expectMsgType[Terminated] | |
val processorResurrected = system.actorOf(Props(classOf[NumberProcessorWithChannel], name, channel)) | |
processorResurrected.tell(Persistent(DecrementAndGet), probe.testActor) | |
val decrementFromMinus1 = probe.expectMsgType[ConfirmablePersistent] | |
decrementFromMinus1.confirm() | |
decrementFromMinus1.payload should equal(-2) | |
deliveredProbe.expectMsgType[Delivered] | |
} | |
} | |
} | |
class NumberSpec extends NumberProcessorSpec(PersistenceSpec.config("leveldb", "NumberSpec")) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment