Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
KafkaEventProcessor.Process
public override async Task Process(CancellationToken cancellationToken = default)
{
// Build chain of handlers
BuildHandlerChain();
// Consume event
var sourceEvent = consumer.ConsumeEvent(cancellationToken);
// Return if EOF
if (sourceEvent == null) return;
// Invoke handler chain
var sourceMessage = new Message<TSourceKey, TSourceValue>(sourceEvent.Key, sourceEvent.Value);
var sinkMessage = await handlers[0].HandleMessage(sourceMessage) as Message<TSinkKey, TSinkValue>;
// Return if message filtered out
if (sinkMessage == null) return;
// Produce event
var sinkEvent = new Confluent.Kafka.Message<TSinkKey, TSinkValue>
{
Key = sinkMessage.Key,
Value = sinkMessage.Value
};
producer.ProduceEvent(sinkEvent);
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.