Skip to content

Instantly share code, notes, and snippets.

@tonysneed
tonysneed / console-consumer
Created June 25, 2020 22:09
Console Consumer
docker exec -it broker bash
cd /usr/bin
./kafka-console-consumer --bootstrap-server broker:29092 --topic "processed-events"
@tonysneed
tonysneed / kafka-docker-compose
Created June 25, 2020 22:02
Kafka Docker Compose
git clone https://github.com/confluentinc/cp-all-in-one
cd cp-all-in-one
git checkout 5.5.0-post
cd cp-all-in-one/
docker-compose up -d --build
docker-compose ps
@tonysneed
tonysneed / program-create-host-builder.cs
Created June 25, 2020 21:55
Program.CreateHostBuilder
public class Program
{
public static void Main(string[] args)
{
CreateHostBuilder(args).Build().Run();
}
public static IHostBuilder CreateHostBuilder(string[] args) =>
Host.CreateDefaultBuilder(args)
.ConfigureServices((hostContext, services) =>
@tonysneed
tonysneed / transform-handler.cs
Created June 25, 2020 21:34
Transform Handler
public class TransformHandler : MessageHandler
{
public override async Task<Message> HandleMessage(Message sourceMessage)
{
var message = (Message<int, string>)sourceMessage;
var sinkMessage = new Message<int, string>(message.Key, message.Value.ToUpper());
return await base.HandleMessage(sinkMessage);
}
}
@tonysneed
tonysneed / kafka-utils.cs
Last active June 25, 2020 22:38
Kafka Utility Methods
public static class KafkaUtils
{
public static IConsumer<int, string> CreateConsumer(string brokerList, List<string> topics)
{
var config = new ConsumerConfig
{
BootstrapServers = brokerList,
GroupId = "sample-consumer"
};
var consumer = new ConsumerBuilder<int, string>(config).Build();
@tonysneed
tonysneed / worker-execute-async.cs
Created June 25, 2020 21:19
Worker ExecuteAsync with IEventProcessor
public class Worker : BackgroundService
{
private readonly IEventProcessor _eventProcessor;
private readonly ILogger<Worker> _logger;
public Worker(IEventProcessor eventProcessor, ILogger<Worker> logger)
{
_eventProcessor = eventProcessor;
_logger = logger;
}
@tonysneed
tonysneed / add-framework-package
Created June 25, 2020 17:36
EventStreamProcessing.Kafka
dotnet add package EventStreamProcessing.Kafka
@tonysneed
tonysneed / dotnet-new-worker
Created June 25, 2020 17:34
Dotnet New Worker
dotnet new worker --name MyWorker
@tonysneed
tonysneed / kafka-event-processor-process.cs
Created June 25, 2020 17:26
KafkaEventProcessor.Process
public override async Task Process(CancellationToken cancellationToken = default)
{
// Build chain of handlers
BuildHandlerChain();
// Consume event
var sourceEvent = consumer.ConsumeEvent(cancellationToken);
// Return if EOF
if (sourceEvent == null) return;
public class Message<TKey, TValue> : Message
{
public TKey Key { get; set; }
public TValue Value { get; set; }
public Message(TKey key, TValue value)
{
Key = key;
Value = value;
}