Skip to content

Instantly share code, notes, and snippets.

@sebastianharko
Last active July 13, 2017 17:33
Show Gist options
  • Save sebastianharko/ffdc4f364809dab3797ecd6246da93e0 to your computer and use it in GitHub Desktop.
Save sebastianharko/ffdc4f364809dab3797ecd6246da93e0 to your computer and use it in GitHub Desktop.
Multiple persistence plugin configurations
// Run this in the Ammonite REPL (http://ammonite.io)
import $ivy.`com.typesafe.akka::akka-persistence:2.5.3`
import $ivy.`com.typesafe.akka::akka-persistence-cassandra:0.54`
import akka.actor.{ActorSystem, Props}
import akka.event.LoggingReceive
import akka.persistence.PersistentActor
import com.typesafe.config.ConfigFactory
sealed trait CassandraConsistencyLevel
case object One extends CassandraConsistencyLevel
case object All extends CassandraConsistencyLevel
case object Quorum extends CassandraConsistencyLevel
trait CustomCassandraConsistencyLevels {
self: PersistentActor =>
def writeConsistency: CassandraConsistencyLevel
def readConsistency: CassandraConsistencyLevel
override def journalPluginId = (writeConsistency, readConsistency) match {
case (One, All) => "cassandra_b"
case (All, One) => "cassandra_a"
// add more useful combinations
case _ => throw new Exception("not supported")
}
}
sealed trait Command
sealed trait Event
case class Hello(greeting: String) extends Command
case class SaidHello(greeting: String) extends Event
class HelloActor extends PersistentActor with CustomCassandraConsistencyLevels {
override def writeConsistency: CassandraConsistencyLevel = One
override def readConsistency: CassandraConsistencyLevel = All
override def persistenceId: String = "hello-actor"
var hellos = 0
override def receiveRecover = LoggingReceive {
case Hello(greeting) =>
hellos += 1
}
override def receiveCommand = LoggingReceive {
case Hello(greeting) =>
persist(SaidHello(greeting)) {
_ => hellos += 1
}
}
}
case object Ping extends Command
case object Pinged extends Event
class PingPongActor extends PersistentActor with CustomCassandraConsistencyLevels {
override def writeConsistency = All
override def readConsistency = One
override def persistenceId: String = "ping-pong-actor"
var pings = 0
override def receiveRecover = LoggingReceive {
case Ping =>
pings += 1
}
override def receiveCommand = LoggingReceive {
case Ping =>
persist(Pinged) {
_ => pings += 1
}
}
}
object Main extends App {
// move this to application.conf
val cfg = ConfigFactory.load().withFallback(ConfigFactory.parseString
("""
|cassandra_a = ${cassandra-journal}
|
|cassandra_b = ${cassandra-journal}
|
|cassandra_a {
|
| write-consistency = "ALL"
|
| read-consistency = "ONE"
|
|}
|
|cassandra_b {
|
| write-consistency = "ONE"
|
| read-consistency = "ALL"
|
|}""".stripMargin))
implicit val system = ActorSystem("test", cfg.resolve())
val actorA = system.actorOf(Props(new HelloActor), "actor-a")
actorA ! Hello("how are you")
val actorB = system.actorOf(Props(new PingPongActor), "actor-b")
actorB ! Ping
}
@sebastianharko
Copy link
Author

BTW This requires a Cassandra cluster running locally.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment