Skip to content

Instantly share code, notes, and snippets.

@AndreSteenbergen
Created September 30, 2018 16:17
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save AndreSteenbergen/55a325ab5417df06b3ac451c05058721 to your computer and use it in GitHub Desktop.
Save AndreSteenbergen/55a325ab5417df06b3ac451c05058721 to your computer and use it in GitHub Desktop.
PostStop not called
using System;
using System.Threading.Tasks;
using Akka.Streams.Stage;
using Akka.Util.Internal;
using Confluent.Kafka;
namespace Akka.Streams.Kafka.Stages
{
internal sealed class ProducerStageLogicMock<K, V> : GraphStageLogic
{
private readonly TaskCompletionSource<NotUsed> _stageCompletion;
private readonly ProducerStage<K, V> _stage;
private volatile bool _inIsClosed;
private readonly AtomicCounter _awaitingConfirmation = new AtomicCounter(0);
public ProducerStageLogicMock(ProducerStage<K, V> stage, Attributes attributes, TaskCompletionSource<NotUsed> completion) : base(stage.Shape)
{
_stageCompletion = completion;
_stage = stage;
SetHandler(_stage.In,
onPush: () =>
{
var msg = Grab(_stage.In);
var result = new TaskCompletionSource<DeliveryReport<K, V>>();
Console.WriteLine("NOT Producing: " + msg.TopicPartition);
var rst = new DeliveryReport<K, V>
{
Message = new Message<K, V>
{
Key = msg.Key,
Value = msg.Value,
Timestamp = Timestamp.Default
},
Topic = msg.Topic,
Offset = -1,
Partition = msg.Partition
};
var t = Task.Delay(100).ContinueWith(_ =>
{
result.SetResult(rst);
if (_awaitingConfirmation.DecrementAndGet() == 0 && _inIsClosed)
{
CheckForCompletion();
}
});
_awaitingConfirmation.IncrementAndGet();
Push(_stage.Out, result.Task);
},
onUpstreamFinish: () =>
{
_inIsClosed = true;
CheckForCompletion();
},
onUpstreamFailure: exception =>
{
_inIsClosed = true;
CheckForCompletion();
});
SetHandler(_stage.Out, onPull: () => TryPull(_stage.In));
}
public override void PreStart()
{
Console.WriteLine("Mock Stage started");
base.PreStart();
}
public override void PostStop()
{
Console.WriteLine("Mock Stage completed");
_stageCompletion.SetResult(NotUsed.Instance);
base.PostStop();
}
public void CheckForCompletion()
{
Console.WriteLine($"Checking complete {IsClosed(_stage.In)} -- {_awaitingConfirmation.Current}");
if (IsClosed(_stage.In) && _awaitingConfirmation.Current == 0)
{
CompleteStage();
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment