Skip to content

Instantly share code, notes, and snippets.

@ale64bit
Last active March 22, 2016 15:12
Show Gist options
  • Save ale64bit/f3a452b501b9bc8d31ba to your computer and use it in GitHub Desktop.
Save ale64bit/f3a452b501b9bc8d31ba to your computer and use it in GitHub Desktop.
import akka.actor._
import akka.persistence.PersistentActor
import akka.persistence.inmemory.query.journal.scaladsl.InMemoryReadJournal
import akka.persistence.query.{EventEnvelope, PersistenceQuery}
import akka.stream.scaladsl.Sink
import akka.stream.{ActorMaterializer, ActorMaterializerSettings}
import akka.testkit.{TestKit, TestProbe}
import com.typesafe.config.ConfigFactory
import org.scalatest._
class InMemJournalTest extends TestKit(ActorSystem("InMemJournalTest", ConfigFactory.parseString(
"""
|akka.actor.warn-about-java-serializer-usage = off
|akka.stream.materializer.debug.fuzzing-mode = on
|akka.persistence {
| journal.plugin = "inmemory-journal"
| snapshot-store.plugin = "inmemory-snapshot-store"
|}
""".stripMargin
)))
with FlatSpecLike
with Matchers
with BeforeAndAfterAll {
import scala.concurrent.duration._
override def afterAll {
shutdown()
}
final lazy implicit val materializer: ActorMaterializer = ActorMaterializer(ActorMaterializerSettings(system))
final val readJournal: InMemoryReadJournal = PersistenceQuery(system).
readJournalFor[InMemoryReadJournal](InMemoryReadJournal.Identifier)
class FooActor extends PersistentActor {
val persistenceId = "foo"
def receiveRecover: Receive = { case _ ⇒ }
def receiveCommand: Receive = {
case "foo" ⇒ persist("bar")(_ ⇒ ())
}
}
class FooView(consumer: ActorRef) extends Actor with Stash {
case object ReplayCompleted
var lastSeqNr = 0l
readJournal.currentEventsByPersistenceId("foo", 0l, Long.MaxValue).runWith(Sink.actorRef(self, ReplayCompleted))
def replaying: Receive = {
case ReplayCompleted ⇒
readJournal.eventsByPersistenceId("foo", lastSeqNr + 1, Long.MaxValue).runWith(Sink.actorRef(self, "done"))
context become normal
unstashAll()
case EventEnvelope(seqNr, "foo", _, "bar") ⇒
lastSeqNr = seqNr
case other ⇒
stash()
}
def normal: Receive = {
case event ⇒
consumer ! event
}
def receive = replaying
}
"FooView" should "receive the events in the journal eventually" in {
val consumer = TestProbe()
val fooView = system.actorOf(Props(new FooView(consumer.ref)), "fooxView")
// test passes if the delay is uncommented
// Thread sleep 1000
val fooActor = system.actorOf(Props(new FooActor), "foo")
fooActor ! "foo"
consumer.expectMsg(10 seconds, EventEnvelope(1l, "foo", 1l, "bar"))
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment