Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Save RayRoestenburg/9595397 to your computer and use it in GitHub Desktop.
Save RayRoestenburg/9595397 to your computer and use it in GitHub Desktop.
package akka.persistence
import scala.language.postfixOps
import com.typesafe.config._
import scala.concurrent.duration._
import akka.actor._
import akka.persistence._
import akka.testkit._
case class SetNumber(number: Int)
case class Add(number: Int)
case class Subtract(number: Int)
case object DecrementAndGet
case object GetNumber
class NumberProcessorWithPersistentChannel(name: String) extends NamedProcessor(name) {
override def processorId = name
var num = 0
val channel = context.actorOf(PersistentChannel.props(channelId = "stable_id",
PersistentChannelSettings(redeliverInterval = 30 seconds, redeliverMax = 15)),
name = "myPersistentChannel")
def receive = {
case Persistent(SetNumber(number), _) ⇒ num = number
case Persistent(Add(number), _) ⇒ num = num + number
case Persistent(Subtract(number), _) ⇒ num = num - number
case GetNumber ⇒ channel ! Deliver(Persistent(num), sender.path)
case p @ Persistent(DecrementAndGet, _) ⇒
num = num - 1
channel ! Deliver(p.withPayload(num), sender.path)
}
}
abstract class NumberProcessorSpec(config: Config) extends AkkaSpec(config)
with PersistenceSpec {
"A processor using a persistent channel" must {
"resurrect with the correct state, not replaying confirmed messages to clients, although this test shows otherwise?" in {
val deliveredProbe = TestProbe()
system.eventStream.subscribe(deliveredProbe.testActor, classOf[DeliveredByPersistentChannel])
val probe = TestProbe()
val processor = namedProcessor[NumberProcessorWithPersistentChannel]
processor.tell(GetNumber, probe.testActor)
val zero = probe.expectMsgType[ConfirmablePersistent]
zero.confirm()
zero.payload should equal(0)
deliveredProbe.expectMsgType[DeliveredByPersistentChannel]
processor.tell(Persistent(DecrementAndGet), probe.testActor)
val decrementFrom0 = probe.expectMsgType[ConfirmablePersistent]
decrementFrom0.confirm()
decrementFrom0.payload should equal(-1)
deliveredProbe.expectMsgType[DeliveredByPersistentChannel]
watch(processor)
system.stop(processor)
expectMsgType[Terminated]
val processorResurrected = namedProcessor[NumberProcessorWithPersistentChannel]
processorResurrected.tell(Persistent(DecrementAndGet), probe.testActor)
// Now I was not expecting this message, what is wrong with my understanding of how this is supposed to work?
val previouslyConfirmed = probe.expectMsgType[ConfirmablePersistent]
previouslyConfirmed.confirm()
previouslyConfirmed.payload should equal(-1)
deliveredProbe.expectMsgType[DeliveredByPersistentChannel]
val decrementFromMinus1 = probe.expectMsgType[ConfirmablePersistent]
decrementFromMinus1.confirm()
decrementFromMinus1.payload should equal(-2)
deliveredProbe.expectMsgType[DeliveredByPersistentChannel]
}
}
}
class NumberSpec extends NumberProcessorSpec(PersistenceSpec.config("leveldb", "NumberSpec"))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment