Skip to content

Instantly share code, notes, and snippets.

@richardimaoka
Created December 30, 2017 11:15
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save richardimaoka/d5183d783cdf04ff97e1e9f5f570d369 to your computer and use it in GitHub Desktop.
Save richardimaoka/d5183d783cdf04ff97e1e9f5f570d369 to your computer and use it in GitHub Desktop.
package example
import akka.persistence.typed.scaladsl.PersistentActor
import akka.persistence.typed.scaladsl.PersistentActor.{ Effect, CommandHandler }
import akka.actor.typed.{ Behavior, Terminated }
import akka.actor.typed.scaladsl.Actor
import akka.actor.ActorSystem
import com.typesafe.config.ConfigFactory
object AkkaTypedPersistenceNoAdapter {
case class Command(str: String)
case class Event(str: String)
case class State(str: String)
private val commandHandler: CommandHandler[Command, Event, State] =
CommandHandler { (ctx, state, cmd) ⇒
cmd match {
case Command(str) ⇒
val event = Event(str)
Effect.persist(event).andThen { state ⇒
println(s"persisted ${event}")
}
}
}
private def eventHandler(state: State, event: Event): State =
event match {
case Event(str) ⇒
State(state.str + ", " + str)
}
def behavior: Behavior[Command] =
PersistentActor.immutable[Command, Event, State](
persistenceId = "mypid",
initialState = State("initial state"),
commandHandler = commandHandler,
eventHandler = eventHandler,
//taggingFunction = _ => Seq("mytag1", "mytag2")
)
def mainBehavior: Behavior[akka.NotUsed] =
Actor.deferred { ctx ⇒
val persistActorRef = ctx.spawn(behavior, "persistent1")
persistActorRef ! Command("abc")
persistActorRef ! Command("def")
persistActorRef ! Command("ghi")
Actor.immutable[akka.NotUsed] {
(_, _) ⇒ Actor.unhandled
} onSignal {
case (ctx, Terminated(ref)) ⇒
Actor.stopped
}
}
def main(args: Array[String]): Unit = {
val config = ConfigFactory.parseString("""
|cassandra-journal {
|}
|akka {
| actor.warn-about-java-serializer-usage = off
| persistence {
| journal {
| plugin = "cassandra-journal"
| }
| snapshot-store {
| plugin = "akka.persistence.snapshot-store.local"
| }
| }
|}""".stripMargin
)
val system = ActorSystem("mySystem", config)
try {
import akka.actor.typed.scaladsl.adapter._
system.spawn(mainBehavior, "guardian")
Thread.sleep(5000)
} finally {
println("terminating the system")
system.terminate()
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment