Last active
March 22, 2016 15:12
-
-
Save ale64bit/f3a452b501b9bc8d31ba to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import akka.actor._ | |
import akka.persistence.PersistentActor | |
import akka.persistence.inmemory.query.journal.scaladsl.InMemoryReadJournal | |
import akka.persistence.query.{EventEnvelope, PersistenceQuery} | |
import akka.stream.scaladsl.Sink | |
import akka.stream.{ActorMaterializer, ActorMaterializerSettings} | |
import akka.testkit.{TestKit, TestProbe} | |
import com.typesafe.config.ConfigFactory | |
import org.scalatest._ | |
class InMemJournalTest extends TestKit(ActorSystem("InMemJournalTest", ConfigFactory.parseString( | |
""" | |
|akka.actor.warn-about-java-serializer-usage = off | |
|akka.stream.materializer.debug.fuzzing-mode = on | |
|akka.persistence { | |
| journal.plugin = "inmemory-journal" | |
| snapshot-store.plugin = "inmemory-snapshot-store" | |
|} | |
""".stripMargin | |
))) | |
with FlatSpecLike | |
with Matchers | |
with BeforeAndAfterAll { | |
import scala.concurrent.duration._ | |
override def afterAll { | |
shutdown() | |
} | |
final lazy implicit val materializer: ActorMaterializer = ActorMaterializer(ActorMaterializerSettings(system)) | |
final val readJournal: InMemoryReadJournal = PersistenceQuery(system). | |
readJournalFor[InMemoryReadJournal](InMemoryReadJournal.Identifier) | |
class FooActor extends PersistentActor { | |
val persistenceId = "foo" | |
def receiveRecover: Receive = { case _ ⇒ } | |
def receiveCommand: Receive = { | |
case "foo" ⇒ persist("bar")(_ ⇒ ()) | |
} | |
} | |
class FooView(consumer: ActorRef) extends Actor with Stash { | |
case object ReplayCompleted | |
var lastSeqNr = 0l | |
readJournal.currentEventsByPersistenceId("foo", 0l, Long.MaxValue).runWith(Sink.actorRef(self, ReplayCompleted)) | |
def replaying: Receive = { | |
case ReplayCompleted ⇒ | |
readJournal.eventsByPersistenceId("foo", lastSeqNr + 1, Long.MaxValue).runWith(Sink.actorRef(self, "done")) | |
context become normal | |
unstashAll() | |
case EventEnvelope(seqNr, "foo", _, "bar") ⇒ | |
lastSeqNr = seqNr | |
case other ⇒ | |
stash() | |
} | |
def normal: Receive = { | |
case event ⇒ | |
consumer ! event | |
} | |
def receive = replaying | |
} | |
"FooView" should "receive the events in the journal eventually" in { | |
val consumer = TestProbe() | |
val fooView = system.actorOf(Props(new FooView(consumer.ref)), "fooxView") | |
// test passes if the delay is uncommented | |
// Thread sleep 1000 | |
val fooActor = system.actorOf(Props(new FooActor), "foo") | |
fooActor ! "foo" | |
consumer.expectMsg(10 seconds, EventEnvelope(1l, "foo", 1l, "bar")) | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment