Skip to content

Instantly share code, notes, and snippets.

@anderly
Last active Dec 12, 2019
Embed
What would you like to do?
MediatR Retry Pipeline Behavior
public class Handler : IRequestHandler<Query, Customer>, IRetryableRequest<Query, Customer>
{
private readonly CustomerContext _db;
private readonly IConfigurationProvider _configuration;
// By implementing IRetryableRequest<TRequest, TResponse> I get the following
// properties to control how my "Handle" method will get retried.
public int RetryAttempts => 2;
public int RetryDelay => 500;
public bool RetryWithExponentialBackoff => true;
public int ExceptionsAllowedBeforeCircuitTrip => 2;
public Handler(CustomerContext db, IConfigurationProvider configuration)
{
_db = db;
_configuration = configuration;
}
public async Task<Customer> Handle(Query message, CancellationToken token)
{
return await _db.Customers.Where(c => c.Number == message.CustomerNumber)
.ProjectTo<Models.Customer>(_configuration)
.FirstOrDefaultAsync(token);
}
}
public interface IRetryableRequest<TRequest, TResponse> where TRequest : IRequest<TResponse>
{
int RetryAttempts => 1;
int RetryDelay => 250;
bool RetryWithExponentialBackoff => false;
int ExceptionsAllowedBeforeCircuitTrip => 1;
}
public class RetryBehavior<TRequest, TResponse> : IPipelineBehavior<TRequest, TResponse> where TRequest : IRequest<TResponse>
{
private readonly IEnumerable<IRetryableRequest<TRequest, TResponse>> _retryHandlers;
private readonly ILogger<RetryBehavior<TRequest, TResponse>> _logger;
public RetryBehavior(IEnumerable<IRetryableRequest<TRequest, TResponse>> retryHandlers, ILogger<RetryBehavior<TRequest, TResponse>> logger)
{
_retryHandlers = retryHandlers;
_logger = logger;
}
public async Task<TResponse> Handle(TRequest request, CancellationToken cancellationToken, RequestHandlerDelegate<TResponse> next)
{
var retryHandler = _retryHandlers.FirstOrDefault();
if (retryHandler == null)
{
// No retry handler found, continue through pipeline
return await next();
}
var circuitBreaker = Policy<TResponse>
.Handle<Exception>()
.CircuitBreakerAsync(retryHandler.ExceptionsAllowedBeforeCircuitTrip, TimeSpan.FromMilliseconds(5000),
(exception, things) =>
{
_logger.LogDebug("Circuit Tripped!");
},
() =>
{
});
var retryPolicy = Policy<TResponse>
.Handle<Exception>()
.WaitAndRetryAsync(retryHandler.RetryAttempts, retryAttempt =>
{
var retryDelay = retryHandler.RetryWithExponentialBackoff
? TimeSpan.FromMilliseconds(Math.Pow(2, retryAttempt) * retryHandler.RetryDelay)
: TimeSpan.FromMilliseconds(_handler.RetryDelay);
_logger.LogDebug($"Retrying, waiting {retryDelay}...");
return retryDelay;
});
var response = await retryPolicy.ExecuteAsync(async () => await next());
return response;
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment