Skip to content

Instantly share code, notes, and snippets.

@Aaronontheweb
Created April 12, 2024 13:23
Show Gist options
  • Save Aaronontheweb/8ea821c9aa05cbeaea538b619e8d0248 to your computer and use it in GitHub Desktop.
Save Aaronontheweb/8ea821c9aa05cbeaea538b619e8d0248 to your computer and use it in GitHub Desktop.
Akka.NET - Deleting Older Snapshots in Akka.Persistence actors that ONLY use the SnapshotStore
Recovered state from snapshot.
Old snapshots deleted successfully.
Old snapshots deleted successfully.
Old snapshots deleted successfully.
Recovered state from snapshot.
Current recovered state of persistent actor is save-3-again
Checking for snapshot history of current actor...
[INFO][04/12/2024 13:21:34.943Z][Thread 0011][akka://Sys/user/exampleActor] Message [DeleteSnapshotsSuccess] from [akka://Sys/system/akka.persistence.snapshot-store.local#1481437221] to [akka://Sys/user/exampleActor#365343416] was not delivered. [1] dead letters encountered. If this is not an expected behavior then [akka://Sys/user/exampleActor#365343416] may have terminated unexpectedly. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'. Message content: DeleteSnapshotsSuccess<SnapshotSelectionCriteria<maxSeqNr: 0, maxTimestamp: 2024/04/12, minSeqNr: 0, minTimestamp: 0001/01/01>>
Found snapshot [SnapshotMetadata<pid: example-persistent-actor, seqNr: 0, timestamp: 2024/04/12>] in LoadSnapshotResultHandler
Received null snapshot result - end of the line.
Found [1] snapshots for PersistentId[example-persistent-actor]
ValueTuple<Int64,DateTime,String>•••
(0, 4/12/2024 1:21:34 PM, save-3-again)
Item1 0
Item2 4/12/2024 1:21:34 PM
Item3 save-3-again
async Task Main()
{
var actorSystem = ActorSystem.Create("Sys");
var persistentActor = actorSystem.ActorOf<ExamplePersistentActor>("exampleActor");
var watch = persistentActor.WatchAsync();
// Sending multiple "save" commands to create snapshots
persistentActor.Tell("save-1");
await Task.Delay(TimeSpan.FromSeconds(1)); // spacing out snapshots, time-wise
persistentActor.Tell("save-2");
await Task.Delay(TimeSpan.FromSeconds(1)); // spacing out snapshots, time-wise
persistentActor.Tell("save-3");
persistentActor.Tell(new ExamplePersistentActor.DieAfterSnapshot());
persistentActor.Tell("save-3-again");
await watch;
// recreate the actor
var persistentActor2 = actorSystem.ActorOf<ExamplePersistentActor>("exampleActor-reborn");
var result = await persistentActor2.Ask<string>(new ExamplePersistentActor.CheckState());
Console.WriteLine($"Current recovered state of persistent actor is {result}");
var historyChecker = actorSystem.ActorOf<SnapshotLoader>("history-checker");
Console.WriteLine($"Checking for snapshot history of current actor...");
var results = await historyChecker.Ask<HashSet<(long seqNr, DateTime timestamp, string snapshot)>>(new SnapshotLoader.CheckSnapshots(ExamplePersistentActor.PersistId));
Console.WriteLine($"Found [{results.Count}] snapshots for PersistentId[{ExamplePersistentActor.PersistId}]");
foreach(var s in results){
Console.WriteLine(s);
}
await actorSystem.Terminate();
}
public class SnapshotLoader : ReceivePersistentActor
{
public sealed record CheckSnapshots(string PersistentId);
// Skip recovery
public override Recovery Recovery => Recovery.None;
public override string PersistenceId => "snapshot-checker";
private IActorRef _caller;
private string _persistentIdToCheck;
public SnapshotLoader(){
Waiting();
}
private void Waiting(){
Command<CheckSnapshots>(c =>{
_persistentIdToCheck = c.PersistentId;
_caller = Sender;
// load the newest snapshot
LoadSnapshot(_persistentIdToCheck, SnapshotSelectionCriteria.Latest, long.MaxValue);
Become(Checking);
});
}
private void Checking(){
var snapshotRecords = new HashSet<(long seqNr, DateTime timestamp, string snapshot)>();
Command<LoadSnapshotResult>(loadResult =>{
var offer = loadResult.Snapshot;
if (offer is null) {
Console.WriteLine("Received null snapshot result - end of the line.");
_caller.Tell(snapshotRecords);
Become(Waiting);
return;
}
Console.WriteLine($"Found snapshot [{offer.Metadata}] in LoadSnapshotResultHandler");
snapshotRecords.Add((offer.Metadata.SequenceNr, offer.Metadata.Timestamp, offer.Snapshot as string));
LoadSnapshot(_persistentIdToCheck, new SnapshotSelectionCriteria(offer.Metadata.SequenceNr, offer.Metadata.Timestamp - TimeSpan.FromMilliseconds(1)), long.MaxValue);
});
Command<LoadSnapshotFailed>(loadResult => {
Console.WriteLine($"Received snapshot load failure - {loadResult.Cause} - end of the line");
_caller.Tell(snapshotRecords);
Become(Waiting);
});
}
}
// You can define other methods, fields, classes and namespaces here
public class ExamplePersistentActor : ReceivePersistentActor
{
public sealed class CheckState{}
public sealed class DieAfterSnapshot{}
private string _state; // This is the state you want to persist.
private bool _dieAfterSnapshot = false;
public const string PersistId = "example-persistent-actor";
public override string PersistenceId => PersistId;
public ExamplePersistentActor()
{
// Command handler
Command<string>(cmd =>
{
_state = cmd;
SaveSnapshot(_state);
});
Command<DieAfterSnapshot>(_ =>{
_dieAfterSnapshot = true;
});
Command<CheckState>(_ => {
Sender.Tell(_state);
});
// Handle the successful saving of a snapshot
Command<SaveSnapshotSuccess>(success =>
{
// After successfully saving a snapshot, delete older snapshots
DeleteSnapshots(new SnapshotSelectionCriteria(0, success.Metadata.Timestamp - TimeSpan.FromMilliseconds(1)));
});
// Handle the response of delete snapshots
Command<DeleteSnapshotsSuccess>(success =>
{
// Log or handle snapshot deletion success
Console.WriteLine("Old snapshots deleted successfully.");
if(_dieAfterSnapshot){
Context.Stop(Self);
}
});
// Recovery from snapshots
Recover<SnapshotOffer>(offer =>
{
// Recover the state from snapshot
_state = offer.Snapshot as string;
Console.WriteLine("Recovered state from snapshot.");
});
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment