Created
May 23, 2018 11:43
-
-
Save Sam13/4d4a0f66d8ab50990d9df4aaa2345de0 to your computer and use it in GitHub Desktop.
Akka Persistence Stash
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System; | |
using System.Collections.Generic; | |
using System.Linq; | |
using System.Threading; | |
using Akka.Actor; | |
using Akka.Persistence; | |
using Akka.TestKit.NUnit3; | |
using NUnit.Framework; | |
namespace AkkaTest.UnitTest | |
{ | |
[TestFixture] | |
internal sealed class SupervisorTest : TestKit | |
{ | |
private IActorRef m_actor; | |
private Supervisor m_actorConcrete; | |
[SetUp] | |
public void SetUp() | |
{ | |
var testActor = ActorOfAsTestActorRef<Supervisor>(Props.Create(() => new Supervisor())); | |
m_actor = testActor.Ref; | |
m_actorConcrete = testActor.UnderlyingActor; | |
} | |
[TearDown] | |
public void TearDown() | |
{ | |
} | |
[Test] | |
public void StartStashedWhenNotYetInitialized() | |
{ | |
using (var mre = new ManualResetEventSlim(false)) | |
{ | |
m_actorConcrete.Done += mre.Set; | |
var itemId = Guid.NewGuid(); | |
m_actor.Tell(new Confirmable<Started>(123, new Started(itemId))); | |
m_actor.Tell(new Confirmable<Pending>(124, new Pending(new List<Guid> {itemId}))); | |
Assert.IsTrue(mre.Wait(TimeSpan.FromMinutes(2))); | |
ExpectMsg<DeliveryConfirmation>(x => x.MessageId == 123); | |
ExpectMsg<DeliveryConfirmation>(x => x.MessageId == 124); | |
} | |
} | |
internal sealed class Confirmable<T> | |
{ | |
public Confirmable(long messageId, T message) | |
{ | |
MessageId = messageId; | |
Message = message; | |
} | |
public long MessageId { get; } | |
public T Message { get; } | |
} | |
internal sealed class DeliveryConfirmation | |
{ | |
public DeliveryConfirmation(long messageId) | |
{ | |
MessageId = messageId; | |
} | |
public long MessageId { get; } | |
} | |
internal sealed class Pending | |
{ | |
public Pending(IEnumerable<Guid> information) | |
{ | |
Ids = information ?? throw new ArgumentNullException(nameof(information)); | |
} | |
public IEnumerable<Guid> Ids { get; private set; } | |
} | |
internal sealed class Started | |
{ | |
public Started(Guid id) | |
{ | |
Id = id; | |
} | |
public Guid Id { get; private set; } | |
} | |
internal abstract class ConfirmablePersistentActor : AtLeastOnceDeliveryReceiveActor | |
{ | |
protected ConfirmablePersistentActor() | |
{ | |
Recover<DeliveryConfirmation>(OnReceive, null); | |
Command<DeliveryConfirmation>(OnCommand, null); | |
Recover<SnapshotOffer>(offer => offer.Snapshot is AtLeastOnceDeliverySnapshot, offer => | |
{ | |
var snapshot = offer.Snapshot as AtLeastOnceDeliverySnapshot; | |
SetDeliverySnapshot(snapshot); | |
}); | |
Command<SaveSnapshotSuccess>(saved => | |
{ | |
var seqNo = saved.Metadata.SequenceNr; | |
DeleteSnapshots(new SnapshotSelectionCriteria(seqNo, saved.Metadata.Timestamp.AddMilliseconds(-1))); | |
}); | |
} | |
private void OnCommand(DeliveryConfirmation message) | |
{ | |
Persist(message, OnReceive); | |
} | |
private void OnReceive(DeliveryConfirmation message) | |
{ | |
ConfirmDelivery(message.MessageId); | |
} | |
protected void ConfirmedSend<T>(IActorRef destination, T message) | |
{ | |
Deliver(destination.Path, id => new Confirmable<T>(id, message)); | |
SaveSnapshot(GetDeliverySnapshot()); | |
} | |
/*protected override void OnPersistFailure(Exception cause, object @event, long sequenceNr) | |
{ | |
base.OnPersistFailure(cause, @event, sequenceNr); | |
} | |
protected override void OnPersistRejected(Exception cause, object @event, long sequenceNr) | |
{ | |
base.OnPersistRejected(cause, @event, sequenceNr); | |
} | |
protected override void Unhandled(object message) | |
{ | |
base.Unhandled(message); | |
}*/ | |
protected void RegisterConfirmable<T>(Action<T> handler) | |
{ | |
Action<Confirmable<T>> confirmHandler = confirmable => | |
{ | |
Confirm(confirmable.MessageId); | |
handler(confirmable.Message); | |
}; | |
Recover<Confirmable<T>>(confirmHandler); | |
Command<Confirmable<T>>(confirmable => Persist(confirmable, confirmHandler)); | |
} | |
private void Confirm(long messageId) | |
{ | |
Sender.Tell(new DeliveryConfirmation(messageId), Self); | |
} | |
} | |
internal sealed class Supervisor : ConfirmablePersistentActor, IWithUnboundedStash | |
// ReceiveActor, IWithUnboundedStash | |
{ | |
private readonly List<Guid> m_ids; | |
public Supervisor() | |
{ | |
m_ids = new List<Guid>(); | |
/*Receive<Confirmable<Pending>>(OnReceive2, null); | |
Receive<Confirmable<Started>>(OnReceive2, null);*/ | |
PersistenceId = "Supervisor"; | |
RegisterConfirmable<Pending>(OnReceive); | |
RegisterConfirmable<Started>(OnReceive); | |
} | |
public event Action Done; | |
public override string PersistenceId { get; } | |
/*public new IStash Stash | |
{ | |
get; | |
set; | |
}*/ | |
protected override void PreStart() | |
{ | |
base.PreStart(); | |
m_ids.Clear(); | |
} | |
private bool StashIfNotYetInitialized() | |
{ | |
if (m_ids.Any()) | |
{ | |
return false; | |
} | |
Stash.Stash(); | |
return true; | |
} | |
/*private void OnReceive2(Confirmable<Pending> msg) | |
{ | |
OnReceive(msg.Message); | |
} | |
private void OnReceive2(Confirmable<Started> msg) | |
{ | |
OnReceive(msg.Message); | |
}*/ | |
private void OnReceive(Pending pending) | |
{ | |
foreach (var pendingId in pending.Ids) | |
{ | |
if (m_ids.Contains(pendingId)) | |
{ | |
continue; | |
} | |
m_ids.Add(pendingId); | |
} | |
Stash.UnstashAll(); | |
} | |
private void OnReceive(Started started) | |
{ | |
if (StashIfNotYetInitialized()) | |
{ | |
return; | |
} | |
Done?.Invoke(); | |
} | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment