Skip to content

Instantly share code, notes, and snippets.

@shaunsales
Last active March 30, 2019 14:23
Show Gist options
  • Save shaunsales/c2cc1a877beca6dbf58054a34dc03c6a to your computer and use it in GitHub Desktop.
Save shaunsales/c2cc1a877beca6dbf58054a34dc03c6a to your computer and use it in GitHub Desktop.
using System;
using Newtonsoft.Json;
using Confluent.Kafka;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Options;
using Fraction.Services.Messaging.Common.Helpers;
using Fraction.Services.Messaging.Common.Messages;
namespace Fraction.Services.Messaging.Common.Consumers
{
public class OrderConsumerBackgroundService : BackgroundService
{
private readonly IOrderConsumerController _orderConsumerController;
private readonly IConsumer<string, string> _consumer;
private readonly int _frequencyMs;
private readonly int _timeoutMs;
private Task _task;
public OrderConsumerBackgroundService(IOptions<ConsumerOptions> options, IOrderConsumerController orderConsumerController)
{
_orderConsumerController = orderConsumerController;
_frequencyMs = options.Value.FrequencyMs;
_timeoutMs = options.Value.TimeoutMs;
_consumer = new ConsumerBuilder<string, string>(options.Value.Config).Build();
_consumer.Subscribe(options.Value.Topic);
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
stoppingToken.Register(() => Console.WriteLine("OrderConsumerBackgroundService is stopping."));
while (!stoppingToken.IsCancellationRequested)
{
if (_task == null ||
_task.Status == TaskStatus.Canceled ||
_task.Status == TaskStatus.Faulted ||
_task.Status == TaskStatus.RanToCompletion)
{
_task = Task.Run(() => { DoWork(); }, stoppingToken);
}
await Task.Delay(_frequencyMs, stoppingToken);
}
}
public override async Task StopAsync(CancellationToken cancellationToken)
{
await base.StopAsync(cancellationToken);
await _task;
_consumer.Close();
_consumer.Dispose();
}
private void DoWork()
{
var consumeResult = _consumer.Consume(TimeSpan.FromMilliseconds(_timeoutMs));
if (consumeResult == null) return;
switch (consumeResult.Key)
{
case "OrderPlaced":
{
var obj = JsonConvert.DeserializeObject<OrderPlaced>(consumeResult.Value);
_orderConsumerController.OnOrderPlaced(obj);
break;
}
case "OrderFilled":
{
var obj = JsonConvert.DeserializeObject<OrderFilled>(consumeResult.Value);
_orderConsumerController.OnOrderFilled(obj);
break;
}
case "OrderCanceled":
{
var obj = JsonConvert.DeserializeObject<OrderCanceled>(consumeResult.Value);
_orderConsumerController.OnOrderCanceled(obj);
break;
}
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment