Skip to content

Instantly share code, notes, and snippets.

@Horusiath Horusiath/example.cs
Last active Jul 9, 2018

Embed
What would you like to do?
Generic persistent gateway for Akka.NET at-least-once-delivery semantics
// Delivery mechanism looks like this - if sender wants to reliably deliver payload to recipient
// using at-least-once delivery semantics, it sends that payload wrapped to Messenger actor, which
// is responsible for the persistence and redelivery:
//
// +--------+ +-----------+ +-----------+
// | |--(DeliverOrder<T>)-->| |--(Delivery<T>:1)-->| |
// | | | | /* 2nd attempt */ | |
// | Sender | | Messenger |--(Delivery<T>:2)-->| Recipient |
// | | | | | |
// | | | |<----(Confirm:2)----| |
// +--------+ +-----------+ +-----------+
public interface IMessengerCommand { }
// send from payload sender to at-least-once delivery actor
public sealed class DeliverOrder<T> : IMessengerCommand
{
public readonly IActorRef Recipient;
public readonly T Payload;
public DeliverOrder(IActorRef recipient, T payload)
{
Recipient = recipient;
Payload = payload;
}
}
// send from payload recipient to at-least-once delivery actor
public sealed class Confirm : IMessengerCommand
{
public readonly long DeliveryId;
public Confirm(long deliveryId)
{
DeliveryId = deliveryId;
}
}
// send from at-least-once delivery actor to payload recipient
public sealed class Delivery<T>
{
public readonly T Payload;
public readonly long DeliveryId;
public Delivery(T payload, long deliveryId)
{
Payload = payload;
DeliveryId = deliveryId;
}
}
public interface IMessengerEvent { }
public sealed class MessageSent<T> : IMessengerEvent
{
public readonly IActorRef Recipient;
public readonly T Payload;
public MessageSent(IActorRef recipient, T payload)
{
Payload = payload;
Recipient = recipient;
}
}
public sealed class MessageConfirmed : IMessengerEvent
{
public readonly long DeliveryId;
public MessageConfirmed(long deliveryId)
{
DeliveryId = deliveryId;
}
}
public sealed class Messenger<T> : AtLeastOnceDeliveryActor
{
// we'll use those numbers to create current state snapshot every 100 events saved
private const int SnapshotInterval = 100;
private int counter = 0;
public Messenger(string persistenceId)
{
PersistenceId = persistenceId;
}
public override string PersistenceId { get; }
protected override bool ReceiveRecover(object message) => message.Match()
// try to recover from the latests snapshot if possible
.With<SnapshotOffer>(offer => SetDeliverySnapshot((AtLeastOnceDeliverySnapshot)offer.Snapshot))
.With<IMessengerEvent>(UpdateState)
.WasHandled;
protected override bool ReceiveCommand(object message) => message.Match()
// send by the requestor to reliably deliver a message T to the recipient
.With<DeliverOrder<T>>(order => PersistEvent(new MessageSent<T>(order.Recipient, order.Payload)))
// send by the recipient to confirm that the message was delivered successfully
.With<Confirm>(confirm => PersistEvent(new MessageConfirmed(confirm.DeliveryId)))
// send by the snapshot store to confirm, that snapshot has been saved
.With<SaveSnapshotSuccess>(snapshotSaved =>
{
var snapshotSeqNr = snapshotSaved.Metadata.SequenceNr;
// delete all messages from journal and snapshot store before latests confirmed
// snapshot, we won't need them anymore
DeleteMessages(snapshotSeqNr);
DeleteSnapshots(new SnapshotSelectionCriteria(snapshotSeqNr-1));
})
.WasHandled;
private void PersistEvent(IMessengerEvent e)
{
// persist event
Persist(e, UpdateState);
// check if it's turn to save at-least-once-delivery state into snapshot store
counter = (counter + 1)%SnapshotInterval;
if (counter == 0)
{
var snapshot = GetDeliverySnapshot();
SaveSnapshot(snapshot);
}
}
private void UpdateState(IMessengerEvent message) => message.Match()
// once message sent request has been stored, start delivery procedure to recipient
.With<MessageSent<T>>(sent => Deliver(sent.Recipient.Path, deliveryId => new Delivery<T>(sent.Payload, deliveryId)))
// once message confirmation has been stored, officially confirm that delivery
.With<MessageConfirmed>(confirmed => ConfirmDelivery(confirmed.DeliveryId));
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.