Last active
March 30, 2019 14:23
-
-
Save shaunsales/c2cc1a877beca6dbf58054a34dc03c6a to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System; | |
using Newtonsoft.Json; | |
using Confluent.Kafka; | |
using System.Threading; | |
using System.Threading.Tasks; | |
using Microsoft.Extensions.Hosting; | |
using Microsoft.Extensions.Options; | |
using Fraction.Services.Messaging.Common.Helpers; | |
using Fraction.Services.Messaging.Common.Messages; | |
namespace Fraction.Services.Messaging.Common.Consumers | |
{ | |
public class OrderConsumerBackgroundService : BackgroundService | |
{ | |
private readonly IOrderConsumerController _orderConsumerController; | |
private readonly IConsumer<string, string> _consumer; | |
private readonly int _frequencyMs; | |
private readonly int _timeoutMs; | |
private Task _task; | |
public OrderConsumerBackgroundService(IOptions<ConsumerOptions> options, IOrderConsumerController orderConsumerController) | |
{ | |
_orderConsumerController = orderConsumerController; | |
_frequencyMs = options.Value.FrequencyMs; | |
_timeoutMs = options.Value.TimeoutMs; | |
_consumer = new ConsumerBuilder<string, string>(options.Value.Config).Build(); | |
_consumer.Subscribe(options.Value.Topic); | |
} | |
protected override async Task ExecuteAsync(CancellationToken stoppingToken) | |
{ | |
stoppingToken.Register(() => Console.WriteLine("OrderConsumerBackgroundService is stopping.")); | |
while (!stoppingToken.IsCancellationRequested) | |
{ | |
if (_task == null || | |
_task.Status == TaskStatus.Canceled || | |
_task.Status == TaskStatus.Faulted || | |
_task.Status == TaskStatus.RanToCompletion) | |
{ | |
_task = Task.Run(() => { DoWork(); }, stoppingToken); | |
} | |
await Task.Delay(_frequencyMs, stoppingToken); | |
} | |
} | |
public override async Task StopAsync(CancellationToken cancellationToken) | |
{ | |
await base.StopAsync(cancellationToken); | |
await _task; | |
_consumer.Close(); | |
_consumer.Dispose(); | |
} | |
private void DoWork() | |
{ | |
var consumeResult = _consumer.Consume(TimeSpan.FromMilliseconds(_timeoutMs)); | |
if (consumeResult == null) return; | |
switch (consumeResult.Key) | |
{ | |
case "OrderPlaced": | |
{ | |
var obj = JsonConvert.DeserializeObject<OrderPlaced>(consumeResult.Value); | |
_orderConsumerController.OnOrderPlaced(obj); | |
break; | |
} | |
case "OrderFilled": | |
{ | |
var obj = JsonConvert.DeserializeObject<OrderFilled>(consumeResult.Value); | |
_orderConsumerController.OnOrderFilled(obj); | |
break; | |
} | |
case "OrderCanceled": | |
{ | |
var obj = JsonConvert.DeserializeObject<OrderCanceled>(consumeResult.Value); | |
_orderConsumerController.OnOrderCanceled(obj); | |
break; | |
} | |
} | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment