Skip to content

Instantly share code, notes, and snippets.

What would you like to do?
public override async Task Process(CancellationToken cancellationToken = default)
// Build chain of handlers
// Consume event
var sourceEvent = consumer.ConsumeEvent(cancellationToken);
// Return if EOF
if (sourceEvent == null) return;
// Invoke handler chain
var sourceMessage = new Message<TSourceKey, TSourceValue>(sourceEvent.Key, sourceEvent.Value);
var sinkMessage = await handlers[0].HandleMessage(sourceMessage) as Message<TSinkKey, TSinkValue>;
// Return if message filtered out
if (sinkMessage == null) return;
// Produce event
var sinkEvent = new Confluent.Kafka.Message<TSinkKey, TSinkValue>
Key = sinkMessage.Key,
Value = sinkMessage.Value
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment