Skip to content

Instantly share code, notes, and snippets.

  • Save stormwild/2fd3e66bcc574e546c70cf99dd8ae21e to your computer and use it in GitHub Desktop.
Save stormwild/2fd3e66bcc574e546c70cf99dd8ae21e to your computer and use it in GitHub Desktop.
MediatR vs MassTransit Mediator - Part 2 - Simplify MassTransit Consumers

Read Part 1: MediatR vs MassTransit Mediator - Differences

The MassTransit Mediator implementation is more powerful than MediatR implementation, but also more complicate to use. Unit Testing a MassTransit Consumer is not a nice experience.

In this article I will propose some techniques to simplify the MassTransit Consumer and to make them look as straight forward as a MediatR handler.

Case study

We will study a very common use case (the most common use case which I can think of):

In a Request / Reply communication, when the Consumer is called

  • we want to log the requests and the responses.
  • if a Business Exception is threw then it is converted to a nice response and gently sent back to the sender as a normal response.
  • if the consumer is crashed by other exception then the sender will crashed as well, the exception is logged before bubbling up to the sender.

Data Model

public record BaseResponse
{
    public string ErrorMessage { get; init; }
}
public class BusinessException : Exception { }
public record MyRequest { }
public record MyResponse : BaseResponse { }

Our response inherits the BaseResponse so that

  • in case success: the ErrorMessage property is null.
  • in case of BusinessError: the ErrorMessage is set.

Requirement: the Consumers have to focus on the Business Logic and produce the response. So the middleware is reponsible for

  • Logging requests and responses.
  • Capturing the Business Error then convert it to a nice response then gracefully return it as a normal response.
  • Log the unknown error in case the consumer crashed (+ forward the error to the sender).

MediatR implementation

See full codes

MediatR requires to modify the Data Model a little:

public record MyRequest : IRequest<MyResponse> { }

As requirement of this case study, the Consumer will focus on the Business Logic:

public class MyConsumer : IRequestHandler<MyRequest, MyResponse>
{
    public MyConsumer(ISomeDependentService someDependentService)
    {
    }

    public Task<MyResponse> Handle(MyRequest request, CancellationToken cancellationToken)
    {
        //throw new BusinessException("Business error");
        return Task.FromResult(new MyResponse());
    }
}

and all the common repetitive works are taken care of by the Middleware

public class LogTryCatchMiddleware<TRequest, TResponse> : IPipelineBehavior<TRequest, TResponse>
    where TRequest : IRequest<TResponse>
    where TResponse : BaseResponse, new()
{
    private readonly ILogger<TRequest> _logger;

    public LogTryCatchMiddleware(ILogger<TRequest> logger)
    {
        _logger = logger;
    }

    public async Task<TResponse> Handle(TRequest request, CancellationToken cancellationToken, RequestHandlerDelegate<TResponse> next)
    {
        var conversationId = Guid.NewGuid();
        _logger.LogInformation("REQUEST {Request} {ConversationId}", request, conversationId);
        try
        {
            TResponse response = await next();
            _logger.LogInformation("RESPONSE {Response} {ConversationId}", response, conversationId);
            return response;
        }
        catch (BusinessException businessEx)
        {
            var response = new TResponse()
            {
                ErrorMessage = businessEx.Message
            };
            _logger.LogWarning(businessEx, "RESPONSE Business Error {ConversationId}", conversationId);
            return response;
        }
        catch (Exception ex)
        {
            _logger.LogError(ex, "CRASHED while consuming {Request} {ConversationId}", request, conversationId);
            throw;
        }
    }
}

Note: you can argues that we should have 1 middelware for logging + 1 middleware for Exception handling to respect the Single Responsibility Principe.. It makes sens but IMO it's not worth to complexify these simple requirements. Your middleware has 2 responsibilities (logging + exception handling) so what?.. is it make thing harder to maintenant or less Efficient?

MassTransit implementation - method 1

See full codes

No need to modify the Data Model (The request don't have to implement IRequest).

As requirement of this case study, the Consumer will focus on the Business Logic:

public class MyConsumer : BaseRequestResponseConsumer<MyRequest, MyResponse>
{
    public MyConsumer(ILogger<MyConsumer> logger, ISomeDependentService someDependentService) : base(logger)
    {
    }

    public override Task<MyResponse> Consume(MyRequest request, CancellationToken cancellationToken)
    {
        //throw new BusinessException("Business error");
        return Task.FromResult(new MyResponse());
    }
}

and all the common repetitive works are taken care of by the BaseRequestResponseConsumer (it is not a Middleware)

public abstract class BaseRequestResponseConsumer<TRequest, TResponse> : IConsumer<TRequest>
    where TRequest : class
    where TResponse : BaseResponse, new()
{
    private readonly ILogger _logger;

    protected BaseRequestResponseConsumer(ILogger logger)
    {
        _logger = logger;
    }

    public async Task Consume(ConsumeContext<TRequest> context)
    {
        TRequest request = context.Message;
        _logger.LogInformation("REQUEST {Request} {ConversationId}", request, context.ConversationId);
        try
        {
            TResponse response = await Consume(request, context.CancellationToken);
            _logger.LogInformation("RESPONSE {Response} {ConversationId}", response, context.ConversationId);
            await context.RespondAsync(response);
        }
        catch (BusinessException businessEx)
        {
            var response = new TResponse()
            {
                ErrorMessage = businessEx.Message
            };
            _logger.LogWarning(businessEx, "RESPONSE Business Error {ConversationId}", context.ConversationId);
            await context.RespondAsync(response);
        }
        catch (Exception ex)
        {
            _logger.LogError(ex, "CRASHED while consuming {Request} {ConversationId}", request, context.ConversationId);
            throw;
        }
    }

    public abstract Task<TResponse> Consume(TRequest request, CancellationToken cancellationToken);
}

Explanation:

  • Compare to the MediatR implementation, The MassTransit version doesn't require to modify the Data Model.
  • The MassTransit Consumer is nearly the same as the MediatR Consumer: The main function which we have to implement has identical input/output => So you can now unit test the MassTransit Consumer as any other normal method (no need for Test Harness things..)
  • The BaseRequestResponseConsumer is comparable to the MediatR middleware (the main codes is nearly the same).
  • A normal MassTransit consumer can "forget" to send back the response or send back any POCO objects of any Type as the responses. The BaseRequestResponseConsumer ensures that the Consumer will always send back one (sole) response of the expected Response Type => It helps to make the MassTransit Consumer as straight forward as the MediatR Consumer.

MassTransit implementation - method 2

See full codes

In this implementation, we will use real MassTransit Middleware capability this time to do the repetitive boring works.

I personally don't like this implementation and think that it is over-engineering. The first method should be enough in most case. However this implementation might be useful for other peoples or other cases study.

Firstly, we will use the same "trick" as MediatR by creating an empty IRequest interface and modify our Data Model to use it.

public interface IRequest<out TResponse> { }
public record MyRequest : IRequest<MyResponse> { }

As requirement of this case study, the Consumer will focus on the Business Logic:

public class MyConsumer : BaseRequestResponseConsumer<MyRequest, MyResponse>
{
    public MyConsumer(ISomeDependentService someDependentService)
    {
    }

    public override Task<MyResponse> Consume(MyRequest request, CancellationToken cancellationToken)
    {
        throw new BusinessException("Business error");
        //return Task.FromResult(new MyResponse());
    }
}

This times, the BaseRequestResponseConsumer is here only to ensure that the Consumer will never forget to send back one (sole) response of the expected Response Type => It helps to make the MassTransit Consumer as straight forward as the MediatR Consumer.

public abstract class BaseRequestResponseConsumer<TRequest, TResponse> : IConsumer<TRequest>
    where TRequest : class
    where TResponse : BaseResponse, new()
{
    public async Task Consume(ConsumeContext<TRequest> context)
    {
        TResponse response = await Consume(context.Message, context.CancellationToken);
        await context.RespondAsync(response);
    }

    public abstract Task<TResponse> Consume(TRequest request, CancellationToken cancellationToken);
}

All the common repetitive works are taken care of by the MassTransit Middlewares, and there are 2 of them!

public class LogTryCatchConsumeFilter<TRequest> : IFilter<ConsumeContext<TRequest>> where TRequest : class
{
    private readonly ILogger<TRequest> _logger;

    public LogTryCatchConsumeFilter(ILogger<TRequest> logger)
    {
        _logger = logger;
    }

    public async Task Send(ConsumeContext<TRequest> context, IPipe<ConsumeContext<TRequest>> next)
    {
        _logger.LogInformation("REQUEST {Request} {ConversationId}", context.Message, context.ConversationId);
        try
        {
            await next.Send(context);
        }
        catch (BusinessException businessEx)
        {
            _logger.LogWarning(businessEx, "RESPONSE Business Error {ConversationId}", context.ConversationId);
            var errorResponse = TryBuildErrorResponse(businessEx);
            if (errorResponse is null)
            {
                throw;
            }
            await context.RespondAsync(errorResponse);
        }
        catch (Exception ex)
        {
            _logger.LogError(ex, "CRASHED while consuming {Request} {ConversationId}", context.Message, context.ConversationId);
            throw;
        }
    }
    ...
}

public class LogResponseSendFilter<T> : IFilter<SendContext<T>> where T : class
{
    private readonly ILogger<T> _logger;

    public LogResponseSendFilter(ILogger<T> logger)
    {
        _logger = logger;
    }

    public Task Send(SendContext<T> context, IPipe<SendContext<T>> next)
    {
        if (context.DestinationAddress?.LocalPath == "/response")
        {
            _logger.LogInformation("RESPONSE {Response} {ConversationId}", context.Message, context.ConversationId);
        }

        return next.Send(context);
    }

    public void Probe(ProbeContext context) { }
}

Compare to the first method, this implementation has some differences:

  • The Middleware logs ALL the Requests and the Responses evens if the Consumer is not inherited from BaseRequestResponseConsumer
  • In case of the Business Error: both the warning and the final response will be logged. Unlike the other two implementations, only the warning is logged in this case
  • TryBuildErrorResponse: the Middleware makes the best effort to convert the BusinessException to a normal response. As long as the Response Type is inherited from BaseResponse and got a default constructor, then the middleware can successfully produce the response. Otherwise, it will forward (throw) the BusinessException to the sender.
  • BaseConsumer (all the Consumers) do not need to depend on the ILogger. Only the middlewares depend on it => we achieve true separation of concerns in this implementation. In truth, the previous implementation has just hide the "plumbing codes" behind the BaseConsumer

My Personal Evaluation

There are some advantages in the 2nd MassTransit implementation (method 2) but I think that the (method 1) is more straight forward and easier to understand by juniors of the team. In any case, the point is that:

The MassTransit consumer can be as simple as the MedidatR handler but way more powerful than the later. After this experience I'm bias towards the MassTransit mediator.

When putting the MediatR side by side with other giants like MassTransit, NServiceBus, I suddenly feel like MediatR does not add much values to my application... (It is not true, just that the shadow of the giants are too big..) I guess the Founder of NServiceBus and some peoples got the same feeling... How far can we go with MediatR before switching to others?

image

The only advatange MediatR has is short learning curved, other implementations required more learning which might discourage you => the normal tradeoff for more power..

Holds on.. what about Fluent Validation?

The effort to integrate Fluent Validation should be the same in both worlds.

Be careful if you want to plug the Fluent Validation to the middleware so that the inputs will be automaticly validated then.. please stop. This aproache is discouraged by the Fluent Validation team

image

And I completly agree with them. You can use this library to write the validation logics, and make them re-usable, but it is better to explicitly use/execute the validation logics in each controller / consumer as part of your Business Logic.

Though if you still want automatic input validation then here is some nugets for you:

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment