Skip to content

Instantly share code, notes, and snippets.

@Aaronontheweb
Last active February 15, 2024 18:05
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save Aaronontheweb/749c890eeea450d18f1ec268783bebca to your computer and use it in GitHub Desktop.
Save Aaronontheweb/749c890eeea450d18f1ec268783bebca to your computer and use it in GitHub Desktop.
Akka.Delivery stream
async Task Main()
{
var actorSystem = ActorSystem.Create("Sys");
var producerSettings = ProducerController.Settings.Create(actorSystem);
using var cts = new CancellationTokenSource();
var (queue, producerController) = StreamConstruction.CreateProducerStream<IMessageProtocol>(actorSystem, producerSettings, "myproducer", cts.Token);
var myConsumer = actorSystem.ActorOf(Props.Create(() => new ConsumerActor(producerController)), "consumer");
foreach(var i in Enumerable.Range(0, 1000)){
var result = await queue.OfferAsync(new MessageImpl1(i.ToString()));
Console.WriteLine($"{result} message {i}");
}
cts.CancelAfter(1000);
await queue.WatchCompletionAsync();
}
public static class StreamConstruction{
public static (ISourceQueueWithComplete<TProtocol> queue, IActorRef producerController) CreateProducerStream<TProtocol>(IActorRefFactory factory, ProducerController.Settings settings, string? producerId = null, CancellationToken ct = default){
var source = Source.Queue<TProtocol>(25, Akka.Streams.OverflowStrategy.Backpressure);
var materializer = factory.Materializer();
var (queue, rawSource) = source.PreMaterialize(materializer);
producerId ??= $"producer-controller-{Random.Shared.Next(0, 10_000)}"; // TODO: require stronger uniqueness guarantee
// TODO: probably need to ensure that producer and producer-controller names are unique too, in case this gets called multiple times by the same IActorRefFactory
var producerController = factory.ActorOf(ProducerController.Create<TProtocol>(factory, producerId, Option<Props>.None, settings), "producer-controller");
var producer = factory.ActorOf(Props.Create(() => new ProducerActor<TProtocol>(producerController)), "producer");
var sink = Sink.ActorRefWithAck<TProtocol>(producer,
ProducerActor<TProtocol>.Init.Instance,
ProducerActor<TProtocol>.AckStream.Instance,
ProducerActor<TProtocol>.Complete.Instance);
var killSwitch = ct.AsFlow<TProtocol>(true);
// begin running the graph
rawSource.Via(killSwitch).RunWith(sink, materializer);
return (queue, producerController);
}
}
public interface IMessageProtocol{
string EntityId {get;}
}
public sealed class MessageImpl1 : IMessageProtocol
{
public MessageImpl1(string entityId){
EntityId = entityId;
}
public string EntityId {get;}
}
public sealed class ConsumerActor : UntypedActor{
private readonly IActorRef _producerController;
private IActorRef _consumerController;
private readonly ILoggingAdapter _log = Context.GetLogger();
public ConsumerActor(IActorRef producerController){
_producerController = producerController;
}
protected override void OnReceive(object message)
{
switch(message){
case ConsumerController.Delivery<IMessageProtocol> delivery:
_log.Info("Received {0}", delivery.Message);
delivery.ConfirmTo.Tell(ConsumerController.Confirmed.Instance);
break;
default:
Unhandled(message);
break;
}
}
protected override void PreStart()
{
var consumerControllerSettings = ConsumerController.Settings.Create(Context.System);
var consumerControllerProps =
ConsumerController.Create<IMessageProtocol>(Context, Option<IActorRef>.None,
consumerControllerSettings);
_consumerController = Context.ActorOf(consumerControllerProps, "controller");
_consumerController.Tell(new ConsumerController.Start<IMessageProtocol>(Self));
_consumerController.Tell(new ConsumerController.RegisterToProducerController<IMessageProtocol>(_producerController));
}
}
public class ProducerActor<TProtocol> : UntypedActor
{
public sealed class Ack
{
public Ack(long currentSeqNo, long highestConfirmedSeqNo){
CurrentSeqNo = currentSeqNo;
HighestConfirmedSeqNo = highestConfirmedSeqNo;
}
public long CurrentSeqNo {get;}
public long HighestConfirmedSeqNo {get;}
}
public sealed class AckStream{
public static readonly AckStream Instance = new AckStream();
private AckStream(){}
}
public sealed class Init{
public static readonly Init Instance = new Init();
private Init(){}
}
public sealed class Complete{
public static readonly Complete Instance = new Complete();
private Complete(){}
}
private readonly IActorRef _producerController;
private readonly Queue<(TProtocol m, IActorRef sender)> _undelivered = new();
public ProducerActor(IActorRef producerController){
_producerController = producerController;
}
protected override void OnReceive(object message)
{
switch (message){
case TProtocol p:
_undelivered.Enqueue((p, Sender));
break;
case Init _:
Sender.Tell(AckStream.Instance);
break;
case ProducerController.RequestNext<TProtocol> req:
// have backpressured messages - deliver oldest first
if(_undelivered.TryDequeue(out var r)){
req.SendNextTo.Tell(r.m);
//r.sender.Tell(new Ack(req.CurrentSeqNr, req.ConfirmedSeqNr));
r.sender.Tell(AckStream.Instance);
}
else{ // nothing to deliver yet
Become(ReadyToSendNow(req));
}
break;
case Complete _:
Self.Tell(PoisonPill.Instance);
_producerController.Tell(PoisonPill.Instance);
break;
default:
Unhandled(message);
break;
}
}
private Receive ReadyToSendNow(ProducerController.RequestNext<TProtocol> req){
return o => {
switch(o){
case TProtocol p:
req.SendNextTo.Tell(p);
Sender.Tell(AckStream.Instance);
Become(OnReceive);
return true;
case Init _:
Sender.Tell(AckStream.Instance);
return true;
case Complete _:
Self.Tell(PoisonPill.Instance);
_producerController.Tell(PoisonPill.Instance);
return true;
default:
return false;
}
};
}
protected override void PreStart()
{
_producerController.Tell(new ProducerController.Start<TProtocol>(Self));
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment