Skip to content

Instantly share code, notes, and snippets.

@domdorn
Last active February 6, 2021 17:14
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save domdorn/61bf5ef7922412a2e2666d723af88547 to your computer and use it in GitHub Desktop.
Save domdorn/61bf5ef7922412a2e2666d723af88547 to your computer and use it in GitHub Desktop.
Accessing the Future API of a Akka Snapshot-store
package com.dominikdorn.akkatest
import akka.actor.ActorSystem
import akka.persistence.SnapshotSelectionCriteria
import akka.dominikdorn.SnapshotStoreGetter
import com.dominikdorn.akkatest.testentity.FilledState
class Example {
def main(args: Array[String]): Unit = {
implicit val actorSystem = ActorSystem.create()
implicit val ec = actorSystem.dispatcher
val snapshotStore = SnapshotStoreGetter.getSnapshotStore("jdbc-snapshot-store")
val maybeSnapshot = snapshotStore.loadAsync("my-entity-1", SnapshotSelectionCriteria(maxSequenceNr = 500))
val filledState = maybeSnapshot.collect{
case Some(selectedSnapshot) => selectedSnapshot.snapshot.asInstanceOf[FilledState]
}
filledState.foreach(stateOfSnapshot => {
println("the name of the entity is = " + stateOfSnapshot.name)
})
}
}
package akka.persistence
// code to accompany the blog post at
// https://dominikdorn.com/2021/02/accessing-the-akka-persistence-snapshot-store-future-api/
// This class gives you access to the future based API of a SnapshotStore by providing the SnapshotStore's ActorRef
private[akka] object SnapshotStoreAccessor {
def apply(ref: ActorRef)(
implicit timeout: Timeout = Timeout(5.seconds)): SnapshotStoreBase =
new SnapshotStoreBase {
import SnapshotProtocol._
override def loadAsync(
persistenceId: String,
criteria: SnapshotSelectionCriteria): Future[Option[SelectedSnapshot]] =
(ref ? SnapshotProtocol.LoadSnapshot(persistenceId, criteria = criteria, toSequenceNr = criteria.maxSequenceNr))
.mapTo[SnapshotProtocol.Response]
.flatMap {
case LoadSnapshotResult(snapshot, _) => Future.successful(snapshot)
case LoadSnapshotFailed(cause) => Future.failed(cause)
case _ => Future.failed(new IllegalStateException("unexpected result"))
}(ExecutionContext.parasitic)
override def saveAsync(metadata: SnapshotMetadata, snapshot: Any): Future[Unit] =
(ref ? SnapshotProtocol.SaveSnapshot(metadata, snapshot))
.mapTo[SnapshotProtocol.Response]
.flatMap {
case SaveSnapshotSuccess(_) => Future.successful(())
case SaveSnapshotFailure(_, cause) => Future.failed(cause)
case _ => Future.failed(new IllegalStateException("unexpected result"))
}(ExecutionContext.parasitic)
override def deleteAsync(metadata: SnapshotMetadata): Future[Unit] =
(ref ? SnapshotProtocol.DeleteSnapshot(metadata))
.mapTo[SnapshotProtocol.Response]
.flatMap {
case DeleteSnapshotSuccess(_) => Future.unit
case DeleteSnapshotFailure(_, cause) => Future.failed(cause)
case _ => Future.failed(new IllegalStateException("unexpected result"))
}(ExecutionContext.parasitic)
override def deleteAsync(persistenceId: String, criteria: SnapshotSelectionCriteria): Future[Unit] =
(ref ? SnapshotProtocol.DeleteSnapshots(persistenceId, criteria))
.mapTo[SnapshotProtocol.Response]
.flatMap {
case DeleteSnapshotsSuccess(_) => Future.unit
case DeleteSnapshotsFailure(_, cause) => Future.failed(cause)
case _ => Future.failed(new IllegalStateException("unexpected result"))
}(ExecutionContext.parasitic)
}
}
package akka.persistence
// code to accompany the blog post at
// https://dominikdorn.com/2021/02/accessing-the-akka-persistence-snapshot-store-future-api/
trait SnapshotStoreBase {
def loadAsync(persistenceId: String, criteria: SnapshotSelectionCriteria): Future[Option[SelectedSnapshot]]
/**
* Plugin API: asynchronously saves a snapshot.
*
* This call is protected with a circuit-breaker.
*
* @param metadata snapshot metadata.
* @param snapshot snapshot.
*/
def saveAsync(metadata: SnapshotMetadata, snapshot: Any): Future[Unit]
/**
* Plugin API: deletes the snapshot identified by `metadata`.
*
* This call is protected with a circuit-breaker.
*
* @param metadata snapshot metadata.
*/
def deleteAsync(metadata: SnapshotMetadata): Future[Unit]
/**
* Plugin API: deletes all snapshots matching `criteria`.
*
* This call is protected with a circuit-breaker.
*
* @param persistenceId id of the persistent actor.
* @param criteria selection criteria for deleting.
*/
def deleteAsync(persistenceId: String, criteria: SnapshotSelectionCriteria): Future[Unit]
}
package akka.dominikdorn
// code to accompany the blog post at
// https://dominikdorn.com/2021/02/accessing-the-akka-persistence-snapshot-store-future-api/
object SnapshotStoreGetter {
private def getSnapshotStoreActorRef(system: ActorSystem, snapshotPluginId: String): ActorRef =
Persistence(system).snapshotStoreFor(snapshotPluginId)
private def getSnapshotStoreFor(ref: ActorRef)(
implicit ec: ExecutionContext): SnapshotStoreBase = SnapshotStoreAccessor(ref)
def getSnapshotStore(snapshotPluginId: String)(implicit ac: ActorSystem): SnapshotStoreBase = {
val ref = getSnapshotStoreActorRef(ac, snapshotPluginId)
getSnapshotStoreFor(ref)(ac.dispatcher)
}
// zio helper
def snapshotStore(snapshotPluginId: String): ZIO[Has[ActorSystem], Nothing, SnapshotStoreBase] =
for {
actorSystem <- ZIO.service[ActorSystem]
ref = getSnapshotStoreActorRef(actorSystem, snapshotPluginId)
snapshotStore <- ZIO.fromFuture(implicit ec => Future.successful(getSnapshotStoreFor(ref))).orDie
} yield snapshotStore
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment