Last active
February 15, 2024 18:05
-
-
Save Aaronontheweb/749c890eeea450d18f1ec268783bebca to your computer and use it in GitHub Desktop.
Akka.Delivery stream
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
async Task Main() | |
{ | |
var actorSystem = ActorSystem.Create("Sys"); | |
var producerSettings = ProducerController.Settings.Create(actorSystem); | |
using var cts = new CancellationTokenSource(); | |
var (queue, producerController) = StreamConstruction.CreateProducerStream<IMessageProtocol>(actorSystem, producerSettings, "myproducer", cts.Token); | |
var myConsumer = actorSystem.ActorOf(Props.Create(() => new ConsumerActor(producerController)), "consumer"); | |
foreach(var i in Enumerable.Range(0, 1000)){ | |
var result = await queue.OfferAsync(new MessageImpl1(i.ToString())); | |
Console.WriteLine($"{result} message {i}"); | |
} | |
cts.CancelAfter(1000); | |
await queue.WatchCompletionAsync(); | |
} | |
public static class StreamConstruction{ | |
public static (ISourceQueueWithComplete<TProtocol> queue, IActorRef producerController) CreateProducerStream<TProtocol>(IActorRefFactory factory, ProducerController.Settings settings, string? producerId = null, CancellationToken ct = default){ | |
var source = Source.Queue<TProtocol>(25, Akka.Streams.OverflowStrategy.Backpressure); | |
var materializer = factory.Materializer(); | |
var (queue, rawSource) = source.PreMaterialize(materializer); | |
producerId ??= $"producer-controller-{Random.Shared.Next(0, 10_000)}"; // TODO: require stronger uniqueness guarantee | |
// TODO: probably need to ensure that producer and producer-controller names are unique too, in case this gets called multiple times by the same IActorRefFactory | |
var producerController = factory.ActorOf(ProducerController.Create<TProtocol>(factory, producerId, Option<Props>.None, settings), "producer-controller"); | |
var producer = factory.ActorOf(Props.Create(() => new ProducerActor<TProtocol>(producerController)), "producer"); | |
var sink = Sink.ActorRefWithAck<TProtocol>(producer, | |
ProducerActor<TProtocol>.Init.Instance, | |
ProducerActor<TProtocol>.AckStream.Instance, | |
ProducerActor<TProtocol>.Complete.Instance); | |
var killSwitch = ct.AsFlow<TProtocol>(true); | |
// begin running the graph | |
rawSource.Via(killSwitch).RunWith(sink, materializer); | |
return (queue, producerController); | |
} | |
} | |
public interface IMessageProtocol{ | |
string EntityId {get;} | |
} | |
public sealed class MessageImpl1 : IMessageProtocol | |
{ | |
public MessageImpl1(string entityId){ | |
EntityId = entityId; | |
} | |
public string EntityId {get;} | |
} | |
public sealed class ConsumerActor : UntypedActor{ | |
private readonly IActorRef _producerController; | |
private IActorRef _consumerController; | |
private readonly ILoggingAdapter _log = Context.GetLogger(); | |
public ConsumerActor(IActorRef producerController){ | |
_producerController = producerController; | |
} | |
protected override void OnReceive(object message) | |
{ | |
switch(message){ | |
case ConsumerController.Delivery<IMessageProtocol> delivery: | |
_log.Info("Received {0}", delivery.Message); | |
delivery.ConfirmTo.Tell(ConsumerController.Confirmed.Instance); | |
break; | |
default: | |
Unhandled(message); | |
break; | |
} | |
} | |
protected override void PreStart() | |
{ | |
var consumerControllerSettings = ConsumerController.Settings.Create(Context.System); | |
var consumerControllerProps = | |
ConsumerController.Create<IMessageProtocol>(Context, Option<IActorRef>.None, | |
consumerControllerSettings); | |
_consumerController = Context.ActorOf(consumerControllerProps, "controller"); | |
_consumerController.Tell(new ConsumerController.Start<IMessageProtocol>(Self)); | |
_consumerController.Tell(new ConsumerController.RegisterToProducerController<IMessageProtocol>(_producerController)); | |
} | |
} | |
public class ProducerActor<TProtocol> : UntypedActor | |
{ | |
public sealed class Ack | |
{ | |
public Ack(long currentSeqNo, long highestConfirmedSeqNo){ | |
CurrentSeqNo = currentSeqNo; | |
HighestConfirmedSeqNo = highestConfirmedSeqNo; | |
} | |
public long CurrentSeqNo {get;} | |
public long HighestConfirmedSeqNo {get;} | |
} | |
public sealed class AckStream{ | |
public static readonly AckStream Instance = new AckStream(); | |
private AckStream(){} | |
} | |
public sealed class Init{ | |
public static readonly Init Instance = new Init(); | |
private Init(){} | |
} | |
public sealed class Complete{ | |
public static readonly Complete Instance = new Complete(); | |
private Complete(){} | |
} | |
private readonly IActorRef _producerController; | |
private readonly Queue<(TProtocol m, IActorRef sender)> _undelivered = new(); | |
public ProducerActor(IActorRef producerController){ | |
_producerController = producerController; | |
} | |
protected override void OnReceive(object message) | |
{ | |
switch (message){ | |
case TProtocol p: | |
_undelivered.Enqueue((p, Sender)); | |
break; | |
case Init _: | |
Sender.Tell(AckStream.Instance); | |
break; | |
case ProducerController.RequestNext<TProtocol> req: | |
// have backpressured messages - deliver oldest first | |
if(_undelivered.TryDequeue(out var r)){ | |
req.SendNextTo.Tell(r.m); | |
//r.sender.Tell(new Ack(req.CurrentSeqNr, req.ConfirmedSeqNr)); | |
r.sender.Tell(AckStream.Instance); | |
} | |
else{ // nothing to deliver yet | |
Become(ReadyToSendNow(req)); | |
} | |
break; | |
case Complete _: | |
Self.Tell(PoisonPill.Instance); | |
_producerController.Tell(PoisonPill.Instance); | |
break; | |
default: | |
Unhandled(message); | |
break; | |
} | |
} | |
private Receive ReadyToSendNow(ProducerController.RequestNext<TProtocol> req){ | |
return o => { | |
switch(o){ | |
case TProtocol p: | |
req.SendNextTo.Tell(p); | |
Sender.Tell(AckStream.Instance); | |
Become(OnReceive); | |
return true; | |
case Init _: | |
Sender.Tell(AckStream.Instance); | |
return true; | |
case Complete _: | |
Self.Tell(PoisonPill.Instance); | |
_producerController.Tell(PoisonPill.Instance); | |
return true; | |
default: | |
return false; | |
} | |
}; | |
} | |
protected override void PreStart() | |
{ | |
_producerController.Tell(new ProducerController.Start<TProtocol>(Self)); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment