Skip to content

Instantly share code, notes, and snippets.

@Chriz76
Last active June 15, 2021 19:26
Show Gist options
  • Save Chriz76/51d3cb5543bbb800855c55a601ac80dd to your computer and use it in GitHub Desktop.
Save Chriz76/51d3cb5543bbb800855c55a601ac80dd to your computer and use it in GitHub Desktop.
using Microsoft.Extensions.Hosting;
using Newtonsoft.Json.Linq;
using PostService.Data;
using PostService.Entities;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
namespace PostService
{
public class IntegrationEventListenerService : BackgroundService
{
private async Task ListenForIntegrationEvents(CancellationToken stoppingToken)
{
try
{
ConnectionFactory factory = new ConnectionFactory
{
UserName = "test",
Password = "test"
};
var endpoints = new System.Collections.Generic.List<AmqpTcpEndpoint>
{
new AmqpTcpEndpoint("host.docker.internal"),
new AmqpTcpEndpoint("localhost")
};
var connection = factory.CreateConnection(endpoints);
var channel = connection.CreateModel();
var consumer = new EventingBasicConsumer(channel);
var arguments = new Dictionary<String, object>
{
{ "x-single-active-consumer", true }
};
channel.QueueDeclare("user.postservicesingleactiveconsumer", false, false, false, arguments);
channel.ExchangeDeclare("userloadtest", "fanout");
channel.QueueBind("user.postservicesingleactiveconsumer", "userloadtest", "");
consumer.Received += (model, ea) =>
{
var body = ea.Body.ToArray();
var message = Encoding.UTF8.GetString(body);
Console.WriteLine("IntegrationEvent {0}", message);
var data = JObject.Parse(message);
var type = ea.RoutingKey;
var user = new User()
{
ID = data["id"].Value<int>(),
Name = data["name"].Value<string>(),
Version = data["version"].Value<int>()
};
if (type == "user.add")
{
_dataAccess.AddUser(user);
}
else if (type == "user.update")
{
_dataAccess.UpdateUser(user);
}
channel.BasicAck(ea.DeliveryTag, false);
};
channel.BasicConsume(queue: "user.postservicesingleactiveconsumer",
autoAck: false,
consumer: consumer);
try
{
await Task.Delay(Timeout.Infinite, stoppingToken);
}
catch (OperationCanceledException)
{
Console.WriteLine("Shutting down.");
}
}
catch (Exception e)
{
Console.WriteLine(e.ToString());
}
}
private readonly DataAccess _dataAccess;
public IntegrationEventListenerService(DataAccess dataAccess)
{
_dataAccess = dataAccess;
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
while (!stoppingToken.IsCancellationRequested)
{
await ListenForIntegrationEvents(stoppingToken);
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment