Created
April 12, 2024 13:23
-
-
Save Aaronontheweb/8ea821c9aa05cbeaea538b619e8d0248 to your computer and use it in GitHub Desktop.
Akka.NET - Deleting Older Snapshots in Akka.Persistence actors that ONLY use the SnapshotStore
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Recovered state from snapshot. | |
Old snapshots deleted successfully. | |
Old snapshots deleted successfully. | |
Old snapshots deleted successfully. | |
Recovered state from snapshot. | |
Current recovered state of persistent actor is save-3-again | |
Checking for snapshot history of current actor... | |
[INFO][04/12/2024 13:21:34.943Z][Thread 0011][akka://Sys/user/exampleActor] Message [DeleteSnapshotsSuccess] from [akka://Sys/system/akka.persistence.snapshot-store.local#1481437221] to [akka://Sys/user/exampleActor#365343416] was not delivered. [1] dead letters encountered. If this is not an expected behavior then [akka://Sys/user/exampleActor#365343416] may have terminated unexpectedly. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'. Message content: DeleteSnapshotsSuccess<SnapshotSelectionCriteria<maxSeqNr: 0, maxTimestamp: 2024/04/12, minSeqNr: 0, minTimestamp: 0001/01/01>> | |
Found snapshot [SnapshotMetadata<pid: example-persistent-actor, seqNr: 0, timestamp: 2024/04/12>] in LoadSnapshotResultHandler | |
Received null snapshot result - end of the line. | |
Found [1] snapshots for PersistentId[example-persistent-actor] | |
ValueTuple<Int64,DateTime,String>••• | |
(0, 4/12/2024 1:21:34 PM, save-3-again) | |
Item1 0 | |
Item2 4/12/2024 1:21:34 PM | |
Item3 save-3-again |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
async Task Main() | |
{ | |
var actorSystem = ActorSystem.Create("Sys"); | |
var persistentActor = actorSystem.ActorOf<ExamplePersistentActor>("exampleActor"); | |
var watch = persistentActor.WatchAsync(); | |
// Sending multiple "save" commands to create snapshots | |
persistentActor.Tell("save-1"); | |
await Task.Delay(TimeSpan.FromSeconds(1)); // spacing out snapshots, time-wise | |
persistentActor.Tell("save-2"); | |
await Task.Delay(TimeSpan.FromSeconds(1)); // spacing out snapshots, time-wise | |
persistentActor.Tell("save-3"); | |
persistentActor.Tell(new ExamplePersistentActor.DieAfterSnapshot()); | |
persistentActor.Tell("save-3-again"); | |
await watch; | |
// recreate the actor | |
var persistentActor2 = actorSystem.ActorOf<ExamplePersistentActor>("exampleActor-reborn"); | |
var result = await persistentActor2.Ask<string>(new ExamplePersistentActor.CheckState()); | |
Console.WriteLine($"Current recovered state of persistent actor is {result}"); | |
var historyChecker = actorSystem.ActorOf<SnapshotLoader>("history-checker"); | |
Console.WriteLine($"Checking for snapshot history of current actor..."); | |
var results = await historyChecker.Ask<HashSet<(long seqNr, DateTime timestamp, string snapshot)>>(new SnapshotLoader.CheckSnapshots(ExamplePersistentActor.PersistId)); | |
Console.WriteLine($"Found [{results.Count}] snapshots for PersistentId[{ExamplePersistentActor.PersistId}]"); | |
foreach(var s in results){ | |
Console.WriteLine(s); | |
} | |
await actorSystem.Terminate(); | |
} | |
public class SnapshotLoader : ReceivePersistentActor | |
{ | |
public sealed record CheckSnapshots(string PersistentId); | |
// Skip recovery | |
public override Recovery Recovery => Recovery.None; | |
public override string PersistenceId => "snapshot-checker"; | |
private IActorRef _caller; | |
private string _persistentIdToCheck; | |
public SnapshotLoader(){ | |
Waiting(); | |
} | |
private void Waiting(){ | |
Command<CheckSnapshots>(c =>{ | |
_persistentIdToCheck = c.PersistentId; | |
_caller = Sender; | |
// load the newest snapshot | |
LoadSnapshot(_persistentIdToCheck, SnapshotSelectionCriteria.Latest, long.MaxValue); | |
Become(Checking); | |
}); | |
} | |
private void Checking(){ | |
var snapshotRecords = new HashSet<(long seqNr, DateTime timestamp, string snapshot)>(); | |
Command<LoadSnapshotResult>(loadResult =>{ | |
var offer = loadResult.Snapshot; | |
if (offer is null) { | |
Console.WriteLine("Received null snapshot result - end of the line."); | |
_caller.Tell(snapshotRecords); | |
Become(Waiting); | |
return; | |
} | |
Console.WriteLine($"Found snapshot [{offer.Metadata}] in LoadSnapshotResultHandler"); | |
snapshotRecords.Add((offer.Metadata.SequenceNr, offer.Metadata.Timestamp, offer.Snapshot as string)); | |
LoadSnapshot(_persistentIdToCheck, new SnapshotSelectionCriteria(offer.Metadata.SequenceNr, offer.Metadata.Timestamp - TimeSpan.FromMilliseconds(1)), long.MaxValue); | |
}); | |
Command<LoadSnapshotFailed>(loadResult => { | |
Console.WriteLine($"Received snapshot load failure - {loadResult.Cause} - end of the line"); | |
_caller.Tell(snapshotRecords); | |
Become(Waiting); | |
}); | |
} | |
} | |
// You can define other methods, fields, classes and namespaces here | |
public class ExamplePersistentActor : ReceivePersistentActor | |
{ | |
public sealed class CheckState{} | |
public sealed class DieAfterSnapshot{} | |
private string _state; // This is the state you want to persist. | |
private bool _dieAfterSnapshot = false; | |
public const string PersistId = "example-persistent-actor"; | |
public override string PersistenceId => PersistId; | |
public ExamplePersistentActor() | |
{ | |
// Command handler | |
Command<string>(cmd => | |
{ | |
_state = cmd; | |
SaveSnapshot(_state); | |
}); | |
Command<DieAfterSnapshot>(_ =>{ | |
_dieAfterSnapshot = true; | |
}); | |
Command<CheckState>(_ => { | |
Sender.Tell(_state); | |
}); | |
// Handle the successful saving of a snapshot | |
Command<SaveSnapshotSuccess>(success => | |
{ | |
// After successfully saving a snapshot, delete older snapshots | |
DeleteSnapshots(new SnapshotSelectionCriteria(0, success.Metadata.Timestamp - TimeSpan.FromMilliseconds(1))); | |
}); | |
// Handle the response of delete snapshots | |
Command<DeleteSnapshotsSuccess>(success => | |
{ | |
// Log or handle snapshot deletion success | |
Console.WriteLine("Old snapshots deleted successfully."); | |
if(_dieAfterSnapshot){ | |
Context.Stop(Self); | |
} | |
}); | |
// Recovery from snapshots | |
Recover<SnapshotOffer>(offer => | |
{ | |
// Recover the state from snapshot | |
_state = offer.Snapshot as string; | |
Console.WriteLine("Recovered state from snapshot."); | |
}); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment