import akka.persistence.PersistentActor
import akka.persistence.inmemory.query.journal.scaladsl.InMemoryReadJournal
import akka.persistence.query.{EventEnvelope, PersistenceQuery}
import{ActorMaterializer, ActorMaterializerSettings}
import akka.testkit.{TestKit, TestProbe}
import com.typesafe.config.ConfigFactory
import org.scalatest._
class InMemJournalTest extends TestKit(ActorSystem("InMemJournalTest", ConfigFactory.parseString(
| = off
| = on
|akka.persistence {
| journal.plugin = "inmemory-journal"
| snapshot-store.plugin = "inmemory-snapshot-store"
with FlatSpecLike
with Matchers
with BeforeAndAfterAll {
import scala.concurrent.duration._
override def afterAll {
final lazy implicit val materializer: ActorMaterializer = ActorMaterializer(ActorMaterializerSettings(system))
final val readJournal: InMemoryReadJournal = PersistenceQuery(system).
class FooActor extends PersistentActor {
val persistenceId = "foo"
def receiveRecover: Receive = { case _ ⇒ }
def receiveCommand: Receive = {
case "foo" ⇒ persist("bar")(_ ⇒ ())
class FooView(consumer: ActorRef) extends Actor with Stash {
case object ReplayCompleted
var lastSeqNr = 0l
readJournal.currentEventsByPersistenceId("foo", 0l, Long.MaxValue).runWith(Sink.actorRef(self, ReplayCompleted))
def replaying: Receive = {
case ReplayCompleted ⇒
readJournal.eventsByPersistenceId("foo", lastSeqNr + 1, Long.MaxValue).runWith(Sink.actorRef(self, "done"))
context become normal
case EventEnvelope(seqNr, "foo", _, "bar") ⇒
lastSeqNr = seqNr
case other ⇒
def normal: Receive = {
case event ⇒
consumer ! event
def receive = replaying
"FooView" should "receive the events in the journal eventually" in {
val consumer = TestProbe()
val fooView = system.actorOf(Props(new FooView(consumer.ref)), "fooxView")
// test passes if the delay is uncommented
// Thread sleep 1000
val fooActor = system.actorOf(Props(new FooActor), "foo")
fooActor ! "foo"
consumer.expectMsg(10 seconds, EventEnvelope(1l, "foo", 1l, "bar"))
