-
-
Save domdorn/61bf5ef7922412a2e2666d723af88547 to your computer and use it in GitHub Desktop.
Accessing the Future API of a Akka Snapshot-store
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.dominikdorn.akkatest | |
import akka.actor.ActorSystem | |
import akka.persistence.SnapshotSelectionCriteria | |
import akka.dominikdorn.SnapshotStoreGetter | |
import com.dominikdorn.akkatest.testentity.FilledState | |
class Example { | |
def main(args: Array[String]): Unit = { | |
implicit val actorSystem = ActorSystem.create() | |
implicit val ec = actorSystem.dispatcher | |
val snapshotStore = SnapshotStoreGetter.getSnapshotStore("jdbc-snapshot-store") | |
val maybeSnapshot = snapshotStore.loadAsync("my-entity-1", SnapshotSelectionCriteria(maxSequenceNr = 500)) | |
val filledState = maybeSnapshot.collect{ | |
case Some(selectedSnapshot) => selectedSnapshot.snapshot.asInstanceOf[FilledState] | |
} | |
filledState.foreach(stateOfSnapshot => { | |
println("the name of the entity is = " + stateOfSnapshot.name) | |
}) | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package akka.persistence | |
// code to accompany the blog post at | |
// https://dominikdorn.com/2021/02/accessing-the-akka-persistence-snapshot-store-future-api/ | |
// This class gives you access to the future based API of a SnapshotStore by providing the SnapshotStore's ActorRef | |
private[akka] object SnapshotStoreAccessor { | |
def apply(ref: ActorRef)( | |
implicit timeout: Timeout = Timeout(5.seconds)): SnapshotStoreBase = | |
new SnapshotStoreBase { | |
import SnapshotProtocol._ | |
override def loadAsync( | |
persistenceId: String, | |
criteria: SnapshotSelectionCriteria): Future[Option[SelectedSnapshot]] = | |
(ref ? SnapshotProtocol.LoadSnapshot(persistenceId, criteria = criteria, toSequenceNr = criteria.maxSequenceNr)) | |
.mapTo[SnapshotProtocol.Response] | |
.flatMap { | |
case LoadSnapshotResult(snapshot, _) => Future.successful(snapshot) | |
case LoadSnapshotFailed(cause) => Future.failed(cause) | |
case _ => Future.failed(new IllegalStateException("unexpected result")) | |
}(ExecutionContext.parasitic) | |
override def saveAsync(metadata: SnapshotMetadata, snapshot: Any): Future[Unit] = | |
(ref ? SnapshotProtocol.SaveSnapshot(metadata, snapshot)) | |
.mapTo[SnapshotProtocol.Response] | |
.flatMap { | |
case SaveSnapshotSuccess(_) => Future.successful(()) | |
case SaveSnapshotFailure(_, cause) => Future.failed(cause) | |
case _ => Future.failed(new IllegalStateException("unexpected result")) | |
}(ExecutionContext.parasitic) | |
override def deleteAsync(metadata: SnapshotMetadata): Future[Unit] = | |
(ref ? SnapshotProtocol.DeleteSnapshot(metadata)) | |
.mapTo[SnapshotProtocol.Response] | |
.flatMap { | |
case DeleteSnapshotSuccess(_) => Future.unit | |
case DeleteSnapshotFailure(_, cause) => Future.failed(cause) | |
case _ => Future.failed(new IllegalStateException("unexpected result")) | |
}(ExecutionContext.parasitic) | |
override def deleteAsync(persistenceId: String, criteria: SnapshotSelectionCriteria): Future[Unit] = | |
(ref ? SnapshotProtocol.DeleteSnapshots(persistenceId, criteria)) | |
.mapTo[SnapshotProtocol.Response] | |
.flatMap { | |
case DeleteSnapshotsSuccess(_) => Future.unit | |
case DeleteSnapshotsFailure(_, cause) => Future.failed(cause) | |
case _ => Future.failed(new IllegalStateException("unexpected result")) | |
}(ExecutionContext.parasitic) | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package akka.persistence | |
// code to accompany the blog post at | |
// https://dominikdorn.com/2021/02/accessing-the-akka-persistence-snapshot-store-future-api/ | |
trait SnapshotStoreBase { | |
def loadAsync(persistenceId: String, criteria: SnapshotSelectionCriteria): Future[Option[SelectedSnapshot]] | |
/** | |
* Plugin API: asynchronously saves a snapshot. | |
* | |
* This call is protected with a circuit-breaker. | |
* | |
* @param metadata snapshot metadata. | |
* @param snapshot snapshot. | |
*/ | |
def saveAsync(metadata: SnapshotMetadata, snapshot: Any): Future[Unit] | |
/** | |
* Plugin API: deletes the snapshot identified by `metadata`. | |
* | |
* This call is protected with a circuit-breaker. | |
* | |
* @param metadata snapshot metadata. | |
*/ | |
def deleteAsync(metadata: SnapshotMetadata): Future[Unit] | |
/** | |
* Plugin API: deletes all snapshots matching `criteria`. | |
* | |
* This call is protected with a circuit-breaker. | |
* | |
* @param persistenceId id of the persistent actor. | |
* @param criteria selection criteria for deleting. | |
*/ | |
def deleteAsync(persistenceId: String, criteria: SnapshotSelectionCriteria): Future[Unit] | |
} | |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package akka.dominikdorn | |
// code to accompany the blog post at | |
// https://dominikdorn.com/2021/02/accessing-the-akka-persistence-snapshot-store-future-api/ | |
object SnapshotStoreGetter { | |
private def getSnapshotStoreActorRef(system: ActorSystem, snapshotPluginId: String): ActorRef = | |
Persistence(system).snapshotStoreFor(snapshotPluginId) | |
private def getSnapshotStoreFor(ref: ActorRef)( | |
implicit ec: ExecutionContext): SnapshotStoreBase = SnapshotStoreAccessor(ref) | |
def getSnapshotStore(snapshotPluginId: String)(implicit ac: ActorSystem): SnapshotStoreBase = { | |
val ref = getSnapshotStoreActorRef(ac, snapshotPluginId) | |
getSnapshotStoreFor(ref)(ac.dispatcher) | |
} | |
// zio helper | |
def snapshotStore(snapshotPluginId: String): ZIO[Has[ActorSystem], Nothing, SnapshotStoreBase] = | |
for { | |
actorSystem <- ZIO.service[ActorSystem] | |
ref = getSnapshotStoreActorRef(actorSystem, snapshotPluginId) | |
snapshotStore <- ZIO.fromFuture(implicit ec => Future.successful(getSnapshotStoreFor(ref))).orDie | |
} yield snapshotStore | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment