Skip to content

Instantly share code, notes, and snippets.

@Aaronontheweb
Last active March 5, 2024 16:08
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save Aaronontheweb/252fd496b7107a0a466fb2e5d609d9e3 to your computer and use it in GitHub Desktop.
Save Aaronontheweb/252fd496b7107a0a466fb2e5d609d9e3 to your computer and use it in GitHub Desktop.
Read Journal Provider
Command<ProjectionStarting>(_ =>
{
// TODO: seeing multiple of these being logged, which makes me think there's a problem with our Akka.Streams graph
_log.Info("Projection for Tag [{0}] is starting from Offset [{1}] - instance [{2}] rand [{3}]", NuGetPersistenceTags.NuGetProductTag,
CurrentState.LastOffset.AsLong(), Self.Path.Uid, Random.Shared.Next());
Sender.Tell(ProjectionAck.Instance);
});
Command<ProjectionCompleted>(_ =>
{
_log.Info("Projection completed for Tag [{0}] at Offset [{1}]", NuGetPersistenceTags.NuGetProductTag,
CurrentState.LastOffset.AsLong());
});
Command<ProjectionFailed>(failed =>
{
_log.Error(failed.Cause, "Projection FAILED for Tag [{0}] at Offset [{1}]",
NuGetPersistenceTags.NuGetProductTag, CurrentState.LastOffset.AsLong());
throw new ApplicationException("Projection failed due to error. See InnerException for details.",
failed.Cause);
});
Command<SaveSnapshotSuccess>(success =>
{
// purge older snapshots and messages
DeleteMessages(success.Metadata.SequenceNr);
DeleteSnapshots(new SnapshotSelectionCriteria(success.Metadata.SequenceNr - 1));
});
public interface IReadJournalProvider
{
IEventsByTagQuery GetEventsByTagQuery(ActorSystem system);
IAllEventsQuery GetAllEventsQuery(ActorSystem system);
IEventsByPersistenceIdQuery GetAllEventsByPersistenceIdQuery(ActorSystem system);
}
public sealed class InMemoryReadJournalProvider : IReadJournalProvider
{
public IAllEventsQuery GetAllEventsQuery(ActorSystem system)
{
return PersistenceQuery.Get(system).ReadJournalFor<InMemoryReadJournal>(InMemoryReadJournal.Identifier);
}
public IEventsByTagQuery GetEventsByTagQuery(ActorSystem system)
{
return PersistenceQuery.Get(system).ReadJournalFor<InMemoryReadJournal>(InMemoryReadJournal.Identifier);
}
public IEventsByPersistenceIdQuery GetAllEventsByPersistenceIdQuery(ActorSystem system){
return PersistenceQuery.Get(system).ReadJournalFor<InMemoryReadJournal>(InMemoryReadJournal.Identifier);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment