Skip to content

Instantly share code, notes, and snippets.

@cheoalfredo
Created March 24, 2022 13:19
Show Gist options
  • Save cheoalfredo/fda2627076b13180ffea7a253ff321ea to your computer and use it in GitHub Desktop.
Save cheoalfredo/fda2627076b13180ffea7a253ff321ea to your computer and use it in GitHub Desktop.
Background process consuming messages and adding spans to trace
private static readonly ActivitySource ActivitySource = new ActivitySource("Worker");
private static readonly TextMapPropagator Propagator = new TraceContextPropagator();
const string ACTIVITY_NAME = "Launching handler to process request";
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
stoppingToken.ThrowIfCancellationRequested();
_model.QueueDeclare(_queueName, false, false, false, null);
var consumer = new EventingBasicConsumer(_model);
consumer.Received += async (sender, e) =>
{
var parentContext = Propagator.Extract(default, e.BasicProperties, ExtractHeaders);
Baggage.Current = parentContext.Baggage;
using (var activity = ActivitySource.StartActivity(ACTIVITY_NAME, ActivityKind.Consumer, parentContext.ActivityContext))
{
var payload = JsonSerializer.Deserialize<Person>(Encoding.UTF8.GetString(e.Body.Span.ToArray()));
if (payload is not null)
{
await _mediator.Send(new CreatePersonAsyncCommand(payload.FirstName, payload.LastName, payload.Email));
}
}
_model.BasicAck(e.DeliveryTag, false);
};
_model.BasicConsume(_queueName, false, consumer);
await Task.CompletedTask;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment