Skip to content

Instantly share code, notes, and snippets.

@PowerMogli
Last active November 16, 2016 15:18
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save PowerMogli/10e5c92933dfeb7f3d4c3af738bf72ad to your computer and use it in GitHub Desktop.
Save PowerMogli/10e5c92933dfeb7f3d4c3af738bf72ad to your computer and use it in GitHub Desktop.
NaipActorSystem = ActorSystem.Create("ScmActorSystem", AkkaSettings.GetConfiguration());
NaipActorSystem.ActorOf(Props.Empty.WithRouter(FromConfig.Instance), "webRouter");
public class OrderPersistentActor : AtLeastOnceDeliveryReceiveActor
{
#region Fields
private readonly ActorPath _actorPath = ActorPath.Parse(ActorPaths.WebRoutingActor.Path);
private ICancelable _cancelable;
#endregion
#region Construction
/// <summary>
/// Initializes a new instance of the <see cref="OrderPersistentActor"/> class.
/// </summary>
public OrderPersistentActor()
{
Connected();
Recover<SnapshotOffer>(offer => SetDeliverySnapshot((global::Akka.Persistence.AtLeastOnceDeliverySnapshot)offer.Snapshot));
}
#endregion
#region Properties
/// <summary>
/// Id of the persistent entity for which messages should be replayed.
/// </summary>
public override string PersistenceId => Context.Self.Path.Name;
#endregion
#region Private Methods
private void Connected()
{
IActorRef self = Self;
Context.System.ActorSelection(_actorPath)
.Ask<ActorIdentity>(new Identify(null))
.PipeTo(self);
ReadUnconfirmed();
Command<ChangePrescriptionStatusToIssued>(msg =>
{
Deliver(
_actorPath,
messageId => new ReliableDeliveryEnvelope<ChangePrescriptionStatusToIssued>(new ChangePrescriptionStatusToIssued(msg.OrderId, msg.UserName), messageId));
SaveSnapshot();
});
Command<ChangePrescriptionStatusToNotIssued>(msg =>
{
Deliver(
_actorPath,
messageId => new ReliableDeliveryEnvelope<ChangePrescriptionStatusToNotIssued>(new ChangePrescriptionStatusToNotIssued(msg.OrderId, msg.UserName, msg.Comment), messageId));
SaveSnapshot();
});
Command<SaveSnapshotSuccess>(snapshotSaved =>
{
long snapshotSeqNr = snapshotSaved.Metadata.SequenceNr;
DeleteSnapshots(new SnapshotSelectionCriteria(snapshotSeqNr - 1));
});
Command<UnconfirmedWarning>(msg =>
{
foreach (UnconfirmedDelivery unconfirmedDelivery in msg.UnconfirmedDeliveries)
{
ConfirmDelivery(unconfirmedDelivery.DeliveryId);
}
});
Command<ActorIdentity>(actorIdentity =>
{
if (actorIdentity?.Subject == null)
{
Log.Error($"Remote actor '{ActorPaths.WebRoutingActor.Name}' could not be located on scm2.0-server.");
Become(Disconnected);
return;
}
Context.Watch(actorIdentity.Subject);
});
Command<Terminated>(msg => Become(Disconnected));
Command<ReliableDeliveryAck>(ack =>
{
ConfirmDelivery(ack.MessageId);
DeleteSnapshots(SnapshotSelectionCriteria.Latest);
SaveSnapshot();
});
}
private void Disconnected()
{
ReadUnconfirmed();
Command<UnconfirmedWarning>(msg =>
{
foreach (UnconfirmedDelivery unconfirmedDelivery in msg.UnconfirmedDeliveries)
{
ConfirmDelivery(unconfirmedDelivery.DeliveryId);
}
});
Command<ChangePrescriptionStatusToIssued>(msg => Stash.Stash());
Command<ChangePrescriptionStatusToNotIssued>(msg => Stash.Stash());
Command<Reconnect>(msg =>
{
ActorIdentity actorIdentity = Context.System.ActorSelection(_actorPath)
.Ask<ActorIdentity>(new Identify(null))
.Result;
if (actorIdentity?.Subject == null)
{
return;
}
_cancelable.Cancel();
Become(Connected);
Stash.UnstashAll();
});
_cancelable = Context.System.Scheduler.ScheduleTellRepeatedlyCancelable(TimeSpan.FromSeconds(5), TimeSpan.FromMinutes(10), Self, new Reconnect(), Self);
}
private void ReadUnconfirmed()
{
Command<ReadUnconfirmedOrderMessages>(msg =>
{
global::Akka.Persistence.AtLeastOnceDeliverySnapshot deliverySnapshot = GetDeliverySnapshot();
IEnumerable<ReliableDeliveryEnvelope<ChangePrescriptionStatusToIssued>> issuedMessages =
deliverySnapshot.UnconfirmedDeliveries.Select(x => x.Message)
.OfType<ReliableDeliveryEnvelope<ChangePrescriptionStatusToIssued>>();
IEnumerable<ReliableDeliveryEnvelope<ChangePrescriptionStatusToNotIssued>> notIssuedMessages =
deliverySnapshot.UnconfirmedDeliveries.Select(x => x.Message)
.OfType<ReliableDeliveryEnvelope<ChangePrescriptionStatusToNotIssued>>();
List<IChangePrescriptionStatusMessage> messages = new List<IChangePrescriptionStatusMessage>();
messages.AddRange(issuedMessages.Select(x => x.Message));
messages.AddRange(notIssuedMessages.Select(x => x.Message));
Sender.Tell(new UnconfirmedOrderMessages(messages), Self);
});
}
private void SaveSnapshot()
{
global::Akka.Persistence.AtLeastOnceDeliverySnapshot snapshot = GetDeliverySnapshot();
SaveSnapshot(snapshot);
}
#endregion
private class Reconnect
{
}
}
public class OrderActor : ReceiveActor
{
private static OrderHandler _orderHandler;
/// <summary>
/// Initializes a new instance of the <see cref="OrderActor" /> class.
/// </summary>
public OrderActor(INaipLog log, OrderHandler orderHandler)
{
_orderHandler = orderHandler;
Receive<ReliableDeliveryEnvelope<ChangePrescriptionStatusToIssued>>(msg =>
{
log.Info(LogPriority.Normal, $"OrderActor received ChangePrescriptionStatusToIssued message for order id {msg.Message.OrderId}");
try
{
SaveWebOrderPrescriptionStatusIssued(msg.Message);
Sender.Tell(new ReliableDeliveryAck(msg.MessageId));
}
catch (System.Exception exception)
{
log.Exception(exception);
}
});
Receive<ReliableDeliveryEnvelope<ChangePrescriptionStatusToNotIssued>>(msg =>
{
log.Info(LogPriority.Normal, $"OrderActor received ChangePrescriptionStatusToNotIssued message for order id {msg.Message.OrderId}");
try
{
SaveWebOrderPrescriptionStatusNotIssued(msg.Message);
Sender.Tell(new ReliableDeliveryAck(msg.MessageId));
}
catch (System.Exception exception)
{
log.Exception(exception);
}
});
}
private static void SaveWebOrderPrescriptionStatusIssued(ChangePrescriptionStatusToIssued message)
{
_orderHandler.SetPrescriptionStatusIssued(message.OrderId);
}
private static void SaveWebOrderPrescriptionStatusNotIssued(ChangePrescriptionStatusToNotIssued message)
{
_orderHandler.SetPrescriptionStatusNotIssued(message.OrderId, message.UserName, message.Comment);
}
}
Program.NaipActorSystem.ActorOf(Props.Create(() => new OrderActor(Log.Akka, this)), "orderActor");
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment