Accessing the Future API of a Akka Snapshot-store
package com.dominikdorn.akkatest | |
import akka.actor.ActorSystem | |
import akka.persistence.SnapshotSelectionCriteria | |
import akka.dominikdorn.SnapshotStoreGetter | |
import com.dominikdorn.akkatest.testentity.FilledState | |
class Example { | |
def main(args: Array[String]): Unit = { | |
implicit val actorSystem = ActorSystem.create() | |
implicit val ec = actorSystem.dispatcher | |
val snapshotStore = SnapshotStoreGetter.getSnapshotStore("jdbc-snapshot-store") | |
val maybeSnapshot = snapshotStore.loadAsync("my-entity-1", SnapshotSelectionCriteria(maxSequenceNr = 500)) | |
val filledState = maybeSnapshot.collect{ | |
case Some(selectedSnapshot) => selectedSnapshot.snapshot.asInstanceOf[FilledState] | |
} | |
filledState.foreach(stateOfSnapshot => { | |
println("the name of the entity is = " + stateOfSnapshot.name) | |
}) | |
} | |
} |
package akka.persistence | |
// code to accompany the blog post at | |
// https://dominikdorn.com/2021/02/accessing-the-akka-persistence-snapshot-store-future-api/ | |
// This class gives you access to the future based API of a SnapshotStore by providing the SnapshotStore's ActorRef | |
private[akka] object SnapshotStoreAccessor { | |
def apply(ref: ActorRef)( | |
implicit timeout: Timeout = Timeout(5.seconds)): SnapshotStoreBase = | |
new SnapshotStoreBase { | |
import SnapshotProtocol._ | |
override def loadAsync( | |
persistenceId: String, | |
criteria: SnapshotSelectionCriteria): Future[Option[SelectedSnapshot]] = | |
(ref ? SnapshotProtocol.LoadSnapshot(persistenceId, criteria = criteria, toSequenceNr = criteria.maxSequenceNr)) | |
.mapTo[SnapshotProtocol.Response] | |
.flatMap { | |
case LoadSnapshotResult(snapshot, _) => Future.successful(snapshot) | |
case LoadSnapshotFailed(cause) => Future.failed(cause) | |
case _ => Future.failed(new IllegalStateException("unexpected result")) | |
}(ExecutionContext.parasitic) | |
override def saveAsync(metadata: SnapshotMetadata, snapshot: Any): Future[Unit] = | |
(ref ? SnapshotProtocol.SaveSnapshot(metadata, snapshot)) | |
.mapTo[SnapshotProtocol.Response] | |
.flatMap { | |
case SaveSnapshotSuccess(_) => Future.successful(()) | |
case SaveSnapshotFailure(_, cause) => Future.failed(cause) | |
case _ => Future.failed(new IllegalStateException("unexpected result")) | |
}(ExecutionContext.parasitic) | |
override def deleteAsync(metadata: SnapshotMetadata): Future[Unit] = | |
(ref ? SnapshotProtocol.DeleteSnapshot(metadata)) | |
.mapTo[SnapshotProtocol.Response] | |
.flatMap { | |
case DeleteSnapshotSuccess(_) => Future.unit | |
case DeleteSnapshotFailure(_, cause) => Future.failed(cause) | |
case _ => Future.failed(new IllegalStateException("unexpected result")) | |
}(ExecutionContext.parasitic) | |
override def deleteAsync(persistenceId: String, criteria: SnapshotSelectionCriteria): Future[Unit] = | |
(ref ? SnapshotProtocol.DeleteSnapshots(persistenceId, criteria)) | |
.mapTo[SnapshotProtocol.Response] | |
.flatMap { | |
case DeleteSnapshotsSuccess(_) => Future.unit | |
case DeleteSnapshotsFailure(_, cause) => Future.failed(cause) | |
case _ => Future.failed(new IllegalStateException("unexpected result")) | |
}(ExecutionContext.parasitic) | |
} | |
} |
package akka.persistence | |
// code to accompany the blog post at | |
// https://dominikdorn.com/2021/02/accessing-the-akka-persistence-snapshot-store-future-api/ | |
trait SnapshotStoreBase { | |
def loadAsync(persistenceId: String, criteria: SnapshotSelectionCriteria): Future[Option[SelectedSnapshot]] | |
/** | |
* Plugin API: asynchronously saves a snapshot. | |
* | |
* This call is protected with a circuit-breaker. | |
* | |
* @param metadata snapshot metadata. | |
* @param snapshot snapshot. | |
*/ | |
def saveAsync(metadata: SnapshotMetadata, snapshot: Any): Future[Unit] | |
/** | |
* Plugin API: deletes the snapshot identified by `metadata`. | |
* | |
* This call is protected with a circuit-breaker. | |
* | |
* @param metadata snapshot metadata. | |
*/ | |
def deleteAsync(metadata: SnapshotMetadata): Future[Unit] | |
/** | |
* Plugin API: deletes all snapshots matching `criteria`. | |
* | |
* This call is protected with a circuit-breaker. | |
* | |
* @param persistenceId id of the persistent actor. | |
* @param criteria selection criteria for deleting. | |
*/ | |
def deleteAsync(persistenceId: String, criteria: SnapshotSelectionCriteria): Future[Unit] | |
} | |
package akka.dominikdorn | |
// code to accompany the blog post at | |
// https://dominikdorn.com/2021/02/accessing-the-akka-persistence-snapshot-store-future-api/ | |
object SnapshotStoreGetter { | |
private def getSnapshotStoreActorRef(system: ActorSystem, snapshotPluginId: String): ActorRef = | |
Persistence(system).snapshotStoreFor(snapshotPluginId) | |
private def getSnapshotStoreFor(ref: ActorRef)( | |
implicit ec: ExecutionContext): SnapshotStoreBase = SnapshotStoreAccessor(ref) | |
def getSnapshotStore(snapshotPluginId: String)(implicit ac: ActorSystem): SnapshotStoreBase = { | |
val ref = getSnapshotStoreActorRef(ac, snapshotPluginId) | |
getSnapshotStoreFor(ref)(ac.dispatcher) | |
} | |
// zio helper | |
def snapshotStore(snapshotPluginId: String): ZIO[Has[ActorSystem], Nothing, SnapshotStoreBase] = | |
for { | |
actorSystem <- ZIO.service[ActorSystem] | |
ref = getSnapshotStoreActorRef(actorSystem, snapshotPluginId) | |
snapshotStore <- ZIO.fromFuture(implicit ec => Future.successful(getSnapshotStoreFor(ref))).orDie | |
} yield snapshotStore | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment