Skip to content

Instantly share code, notes, and snippets.

@tonysneed
Created June 25, 2020 17:26
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save tonysneed/55e029708e025843e6bc42e2b111cc43 to your computer and use it in GitHub Desktop.
Save tonysneed/55e029708e025843e6bc42e2b111cc43 to your computer and use it in GitHub Desktop.
KafkaEventProcessor.Process
public override async Task Process(CancellationToken cancellationToken = default)
{
// Build chain of handlers
BuildHandlerChain();
// Consume event
var sourceEvent = consumer.ConsumeEvent(cancellationToken);
// Return if EOF
if (sourceEvent == null) return;
// Invoke handler chain
var sourceMessage = new Message<TSourceKey, TSourceValue>(sourceEvent.Key, sourceEvent.Value);
var sinkMessage = await handlers[0].HandleMessage(sourceMessage) as Message<TSinkKey, TSinkValue>;
// Return if message filtered out
if (sinkMessage == null) return;
// Produce event
var sinkEvent = new Confluent.Kafka.Message<TSinkKey, TSinkValue>
{
Key = sinkMessage.Key,
Value = sinkMessage.Value
};
producer.ProduceEvent(sinkEvent);
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment