Skip to content

Instantly share code, notes, and snippets.

@brantburnett
Last active May 10, 2022 12:07
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save brantburnett/9a39ae2077bc0203c17068c517564b8f to your computer and use it in GitHub Desktop.
Save brantburnett/9a39ae2077bc0203c17068c517564b8f to your computer and use it in GitHub Desktop.
RabbitMQ message bus OpenTelemetry tracing example
using System;
using System.Collections;
using System.Collections.Generic;
namespace CenterEdge.Common.Events
{
/// <summary>
/// Collection of properties associated with the event, such as tracing headers.
/// </summary>
public class EventProperties : IDictionary<string, string?>
{
private readonly Dictionary<string, string?> _inner = new(StringComparer.OrdinalIgnoreCase);
private IDictionary<string, string?> InnerAsIDictionary => _inner;
/// <inheritdoc />
public IEnumerator<KeyValuePair<string, string?>> GetEnumerator()
{
return _inner.GetEnumerator();
}
IEnumerator IEnumerable.GetEnumerator()
{
return ((IEnumerable)_inner).GetEnumerator();
}
void ICollection<KeyValuePair<string, string?>>.Add(KeyValuePair<string, string?> item)
{
InnerAsIDictionary.Add(item);
}
/// <inheritdoc />
public void Clear()
{
_inner.Clear();
}
bool ICollection<KeyValuePair<string, string?>>.Contains(KeyValuePair<string, string?> item)
{
return InnerAsIDictionary.Contains(item);
}
void ICollection<KeyValuePair<string, string?>>.CopyTo(KeyValuePair<string, string?>[] array, int arrayIndex)
{
InnerAsIDictionary.CopyTo(array, arrayIndex);
}
bool ICollection<KeyValuePair<string, string?>>.Remove(KeyValuePair<string, string?> item)
{
return InnerAsIDictionary.Remove(item);
}
/// <inheritdoc />
public int Count => _inner.Count;
/// <inheritdoc />
public bool IsReadOnly => InnerAsIDictionary.IsReadOnly;
/// <inheritdoc />
public void Add(string key, string? value)
{
_inner.Add(key, value);
}
/// <inheritdoc />
public bool ContainsKey(string key)
{
return _inner.ContainsKey(key);
}
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;
using CenterEdge.Common.Events;
using CenterEdge.Common.MessageBus.Events;
using Microsoft.Extensions.Logging;
using OpenTelemetry;
using OpenTelemetry.Context.Propagation;
namespace CenterEdge.Common.MessageBus
{
/// <summary>
/// Abstract base implementation for an <see cref="IEventPublisher"/> which includes standard handling
/// like tracing propagation.
/// </summary>
public abstract class EventPublisherBase : IEventPublisher
{
/// <summary>
/// Logger.
/// </summary>
protected ILogger Logger { get; }
/// <summary>
/// Create a new EventPublisherBase.
/// </summary>
/// <param name="logger">Logger.</param>
/// <exception cref="ArgumentNullException"><paramref name="eventContextProvider"/> or <paramref name="logger"/> is null.</exception>
protected EventPublisherBase(ILogger logger)
{
Logger = logger ?? throw new ArgumentNullException(nameof(logger));
}
/// <inheritdoc />
public async Task PublishEventAsync<T>(T e, CancellationToken cancellationToken = default) where T : class, IServiceEvent
{
if (e == null)
{
throw new ArgumentNullException(nameof(e));
}
if (string.IsNullOrWhiteSpace(e.Topic))
{
throw new ArgumentException("Topic is required", nameof(e));
}
try
{
e.Properties ??= new EventProperties();
using (var activity = MessageBusActivitySources.PublishActivitySource.StartActivity($"{typeof(T).FullName} send", ActivityKind.Producer))
{
// Depending on Sampling (and whether a listener is registered or not), the activity above may not be created.
// If it is created, then propagate its context. If it is not created, the propagate the Current context, if any.
ActivityContext contextToInject = activity?.Context ?? Activity.Current?.Context ?? default;
// Inject the activity context into the message properties to propagate trace context
Propagators.DefaultTextMapPropagator.Inject(new PropagationContext(contextToInject, Baggage.Current), e.Properties, InjectTraceContext);
await PublishEventCoreAsync(e, activity, cancellationToken).ConfigureAwait(false);
}
}
catch (Exception ex)
{
var message = $"Error publishing event '{e.GetType()}' to message bus";
Logger.LogError(ex, message);
throw new MessageBusPublishException(message, ex);
}
}
/// <summary>
/// Core implementation for publishing an event. This is called by <see cref="PublishEventAsync{T}"/> after
/// contract validation, tracing propagation, etc.
/// </summary>
/// <typeparam name="T">The type of event, must implement from <see cref="IServiceEvent"/>.</typeparam>
/// <param name="e">The event.</param>
/// <param name="activity">Tracing activity.</param>
/// <param name="cancellationToken">Cancellation token.</param>
protected abstract Task PublishEventCoreAsync<T>(T e, Activity? activity, CancellationToken cancellationToken = default) where T : class, IServiceEvent;
// PublishEventCoreAsync would have RabbitMQ specifics in its implementation,
// including a call to RabbitMQHelpers.AddMessageTags to fill in standardized tags on the activity
private static void InjectTraceContext(EventProperties properties, string key, string value)
{
properties[key] = value;
}
}
}
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Threading.Tasks;
using CenterEdge.Common.Events;
using CenterEdge.Common.MessageBus.Events;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using OpenTelemetry;
using OpenTelemetry.Context.Propagation;
namespace CenterEdge.Common.MessageBus
{
/// <summary>
/// Base implementation for <see cref="IEventSubscriber" /> which includes standard handling like tracing propagation.
/// </summary>
/// <typeparam name="T">Type of event, must implement <see cref="IServiceEvent"/>.</typeparam>
public abstract class EventSubscriberBase<T> : IEventSubscriber
where T : class, IServiceEvent
{
private readonly EventDelegate _eventHandler;
private readonly IServiceProvider _serviceProvider;
private readonly string _activityName = $"{typeof(T).FullName} receive";
/// <summary>
/// Creates a new EventSubscriberBase.
/// </summary>
/// <param name="eventHandler">Delegate that will process the event.</param>
/// <param name="serviceProvider">Service provider for supporting services.</param>
/// <exception cref="ArgumentNullException"><paramref name="eventHandler" /> or <paramref name="serviceProvider" /> is null.</exception>
protected EventSubscriberBase(EventDelegate eventHandler, IServiceProvider serviceProvider)
{
_eventHandler = eventHandler ?? throw new ArgumentNullException(nameof(eventHandler));
_serviceProvider = serviceProvider ?? throw new ArgumentNullException(nameof(serviceProvider));
}
/// <summary>
/// Handles an event received from the bus, called by the inherited class after it applies its processing.
/// </summary>
/// <param name="message">The event to handle.</param>
/// <param name="internalMessage">The internal message object received from the bus.</param>
protected virtual async Task HandleEvent(T message, object? internalMessage)
{
var parentContext =
Propagators.DefaultTextMapPropagator.Extract(default, message.Properties, ExtractTraceContext);
Baggage.Current = parentContext.Baggage;
using (var activity = MessageBusActivitySources.ReceiveActivitySource.StartActivity(_activityName, ActivityKind.Consumer,
parentContext.ActivityContext))
{
AddActivityTags(activity, message);
using (var scope = _serviceProvider.CreateScope())
{
var context = new DefaultEventContext<T>(scope.ServiceProvider, message, internalMessage);
var eventContextProvider = scope.ServiceProvider.GetRequiredService<IEventContextProvider>();
eventContextProvider.CurrentEvent = context;
await _eventHandler(context).ConfigureAwait(false);
}
}
}
/// <summary>
/// Add any provider-specific tags to the trace activity.
/// </summary>
/// <param name="activity">The activity to update. May be null.</param>
/// <param name="message">The message received from the bus.</param>
protected virtual void AddActivityTags(Activity? activity, T message)
{
// For the RabbitMQ implementation of this abstract, it would call
// RabbitMQHelpers.AddMessageTags to fill in standardized tags on the activity
}
/// <inheritdoc />
public abstract void Dispose();
private static IEnumerable<string?> ExtractTraceContext(EventProperties? properties, string key)
{
if (properties != null && properties.TryGetValue(key, out var value))
{
return new[] { value };
}
return Enumerable.Empty<string?>();
}
}
}
using System.Collections.Generic;
namespace CenterEdge.Common.Events
{
/// <summary>
/// Base interface for service events.
/// </summary>
public interface IServiceEvent
{
/// <summary>
/// Topic of this event.
/// </summary>
string Topic { get; set; }
/// <summary>
/// Properties related to the event, such as tracing headers.
/// </summary>
EventProperties? Properties { get; set; }
}
}
using System;
using System.Diagnostics;
namespace CenterEdge.Common.MessageBus
{
/// <summary>
/// Activity sources related to tracing for the message bus.
/// </summary>
public static class MessageBusActivitySources
{
/// <summary>
/// Name of the activity source for message publishing.
/// </summary>
public const string PublishActivitySourceName = "CenterEdge.Common.MessageBus.Publish";
/// <summary>
/// Name of the activity source for message receiving.
/// </summary>
public const string ReceiveActivitySourceName = "CenterEdge.Common.MessageBus.Receive";
internal static readonly ActivitySource PublishActivitySource = new(PublishActivitySourceName, "1.0.0");
internal static readonly ActivitySource ReceiveActivitySource = new(ReceiveActivitySourceName, "1.0.0");
}
}
using System;
using OpenTelemetry.Trace;
namespace CenterEdge.Common.MessageBus
{
/// <summary>
/// Extensions for <see cref="TracerProviderBuilder" />.
/// </summary>
public static class MessageBusTracerProviderBuilderExtensions
{
/// <summary>
/// Add instrumentation for the message bus.
/// </summary>
/// <param name="builder">The <see cref="TracerProviderBuilder" />.</param>
/// <returns>The <see cref="TracerProviderBuilder" />.</returns>
public static TracerProviderBuilder AddMessageBusInstrumentation(this TracerProviderBuilder builder)
{
// ReSharper disable once SuspiciousTypeConversion.Global
if (builder is IDeferredTracerProviderBuilder deferredTracerProviderBuilder)
{
return deferredTracerProviderBuilder.Configure(static (_, deferredBuilder) =>
{
deferredBuilder.AddSource(MessageBusActivitySources.PublishActivitySourceName, MessageBusActivitySources.ReceiveActivitySourceName);
});
}
return builder.AddSource(MessageBusActivitySources.PublishActivitySourceName, MessageBusActivitySources.ReceiveActivitySourceName);
}
}
}
using System;
using System.Diagnostics;
using CenterEdge.Common.Events;
using EasyNetQ;
namespace CenterEdge.Common.MessageBus.RabbitMQ.Events.Internal
{
internal static class RabbitMQHelpers
{
public static void AddMessageTags(Activity? activity, IBus bus, IServiceEvent message)
{
if (activity != null)
{
// These tags are added demonstrating the semantic conventions of the OpenTelemetry messaging specification
// See:
// * https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/trace/semantic_conventions/messaging.md#messaging-attributes
// * https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/trace/semantic_conventions/messaging.md#rabbitmq
activity.SetTag("messaging.system", "rabbitmq");
activity.SetTag("messaging.destination_kind", "queue");
activity.SetTag("messaging.destination", bus.Advanced.Conventions.ExchangeNamingConvention(message.GetType()));
activity.SetTag("messaging.rabbitmq.routing_key", message.Topic);
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment