Skip to content

Instantly share code, notes, and snippets.

@anderly
Last active Aug 23, 2021
Embed
What would you like to do?
MediatR Caching Pipeline Behavior
public class CacheService : ICache
{
private readonly string _keyPrefix;
private readonly IDistributedCache _cache;
private readonly IConfiguration _config;
public CacheService(IDistributedCache cache, IConfiguration config)
{
_cache = cache;
_config = config;
var keyPrefix = _config["CacheKeyPrefix"];
_keyPrefix = keyPrefix ?? typeof(CacheService).FullName;
}
public string GetCacheKey(string key)
{
return MakeKey(key);
}
public async Task<T> GetAsync<T>(string key, CancellationToken token = default)
{
return await GetInternalAsync<T>(key, token);
}
public async Task<T> GetOrCreateAsync<T>(string key, Func<Task<T>> factory, CancellationToken token = default)
{
return await GetOrCreateInternalAsync<T>(key, null, null, null, factory, token);
}
public async Task<T> GetOrCreateAsync<T>(string key, TimeSpan slidingExpiration, Func<Task<T>> factory, CancellationToken token = default)
{
return await GetOrCreateInternalAsync<T>(key, slidingExpiration, null, null, factory, token);
}
public async Task<T> GetOrCreateAsync<T>(string key, DateTime absoluteExpiration, Func<Task<T>> factory, CancellationToken token = default)
{
return await GetOrCreateInternalAsync<T>(key, null, absoluteExpiration, null, factory, token);
}
public async Task<T> GetOrCreateAsync<T>(string key, TimeSpan slidingExpiration, DateTime absoluteExpiration, Func<Task<T>> factory, CancellationToken token = default)
{
return await GetOrCreateInternalAsync<T>(key, slidingExpiration, absoluteExpiration, null, factory, token);
}
public async Task<T> GetOrCreateAsync<T>(string key, TimeSpan slidingExpiration, TimeSpan absoluteExpirationRelativeToNow, Func<Task<T>> factory, CancellationToken token = default)
{
return await GetOrCreateInternalAsync<T>(key, slidingExpiration, null, absoluteExpirationRelativeToNow, factory, token);
}
public async Task<T> GetOrCreateAsync<T>(string key, TimeSpan? slidingExpiration, DateTime? absoluteExpiration, TimeSpan? absoluteExpirationRelativeToNow, Func<Task<T>> factory, CancellationToken token = default)
{
return await GetOrCreateInternalAsync<T>(key, slidingExpiration, absoluteExpiration, absoluteExpirationRelativeToNow, factory, token);
}
public async Task SetAsync<T>(string key, T value, CancellationToken token = default)
{
await SetInternalAsync(key, value, null, null, null, token);
}
public async Task SetAsync<T>(string key, T value, DateTime absoluteExpiration,
CancellationToken token = default)
{
await SetInternalAsync(key, value, null, absoluteExpiration, null, token);
}
public async Task SetAsync<T>(string key, T value, TimeSpan slidingExpiration,
CancellationToken token = default)
{
await SetInternalAsync(key, value, slidingExpiration, null, null, token);
}
public async Task SetAsync<T>(string key, T value, TimeSpan? slidingExpiration, DateTime? absoluteExpiration,
CancellationToken token = default)
{
await SetInternalAsync(key, value, slidingExpiration, absoluteExpiration, null, token);
}
public async Task SetAsync<T>(string key, T value, TimeSpan? slidingExpiration, TimeSpan? absoluteExpirationRelativeToNow,
CancellationToken token = default)
{
await SetInternalAsync(key, value, slidingExpiration, null, absoluteExpirationRelativeToNow, token);
}
public async Task SetAsync<T>(string key, T value, TimeSpan? slidingExpiration, DateTime? absoluteExpiration,
TimeSpan? absoluteExpirationRelativeToNow, CancellationToken token = default)
{
await SetInternalAsync(key, value, slidingExpiration, absoluteExpiration, absoluteExpirationRelativeToNow, token);
}
public async Task RefreshAsync(string key, CancellationToken token = default)
{
await _cache.RefreshAsync(key, token);
}
public async Task RemoveAsync(string key, CancellationToken token = default)
{
await _cache.RemoveAsync(key, token);
}
private string MakeKey(string key)
{
return $"{(string.IsNullOrWhiteSpace(_keyPrefix) ? "" : _keyPrefix + ":")}{key}";
}
private async Task<T> GetInternalAsync<T>(string key, CancellationToken token = default)
{
return await _cache.GetAsync<T>(MakeKey(key), token);
}
private async Task<T> GetOrCreateInternalAsync<T>(string key, TimeSpan? slidingExpiration, DateTime? absoluteExpiration, TimeSpan? absoluteExpirationRelativeToNow, Func<Task<T>> factory, CancellationToken token = default)
{
var value = await GetInternalAsync<T>(key, token);
if (value != null) return value;
value = await factory();
if (value != null)
{
await SetInternalAsync<T>(key, value, slidingExpiration, absoluteExpiration, absoluteExpirationRelativeToNow, token);
}
return value;
}
private async Task SetInternalAsync<T>(string key, T value, TimeSpan? slidingExpiration, DateTime? absoluteExpiration, TimeSpan? absoluteExpirationRelativeToNow,
CancellationToken token = default)
{
var cacheEntryOptions = new DistributedCacheEntryOptions();
if (slidingExpiration.HasValue)
{
cacheEntryOptions.SlidingExpiration = slidingExpiration.Value;
}
if (absoluteExpiration.HasValue)
{
cacheEntryOptions.AbsoluteExpiration = absoluteExpiration.Value;
}
if (absoluteExpirationRelativeToNow.HasValue)
{
cacheEntryOptions.AbsoluteExpirationRelativeToNow = absoluteExpirationRelativeToNow.Value;
}
if (!slidingExpiration.HasValue && !absoluteExpiration.HasValue && !absoluteExpirationRelativeToNow.HasValue)
{
cacheEntryOptions.SetSlidingExpiration(TimeSpan.FromSeconds(30));
cacheEntryOptions.AbsoluteExpirationRelativeToNow = TimeSpan.FromMinutes(5);
//cacheEntryOptions.SetAbsoluteExpiration(DateTime.UtcNow.AddMinutes(5));
}
await _cache.SetAsync<T>(MakeKey(key), value, cacheEntryOptions, token);
}
}
/// <summary>
/// MediatR Caching Pipeline Behavior
/// </summary>
/// <typeparam name="TRequest"></typeparam>
/// <typeparam name="TResponse"></typeparam>
public class CachingBehavior<TRequest, TResponse> : IPipelineBehavior<TRequest, TResponse> where TRequest : IRequest<TResponse>
{
private readonly IEnumerable<ICachePolicy<TRequest, TResponse>> _cachePolicies;
// ICache is a helper wrapper over IDistributedCache that adds some read-through cache methods, etc.
private readonly ICache _cache;
private readonly ILogger<CachingBehavior<TRequest, TResponse>> _logger;
public CachingBehavior(ICache cache, ILogger<CachingBehavior<TRequest, TResponse>> logger, IEnumerable<ICachePolicy<TRequest, TResponse>> cachePolicies)
{
_cache = cache;
_logger = logger;
_cachePolicies = cachePolicies;
}
public async Task<TResponse> Handle(TRequest request, CancellationToken cancellationToken, RequestHandlerDelegate<TResponse> next)
{
var cachePolicy = _cachePolicies.FirstOrDefault();
if (cachePolicy == null)
{
// No cache policy found, so just continue through the pipeline
return await next();
}
var cacheKey = cachePolicy.GetCacheKey(request);
var cachedResponse = await _cache.GetAsync<TResponse>(cacheKey, cancellationToken);
if (cachedResponse != null)
{
_logger.LogDebug($"Response retrieved {typeof(TRequest).FullName} from cache. CacheKey: {cacheKey}");
return cachedResponse;
}
var response = await next();
_logger.LogDebug($"Caching response for {typeof(TRequest).FullName} with cache key: {cacheKey}");
await _cache.SetAsync(cacheKey, response, cachePolicy.SlidingExpiration, cachePolicy.AbsoluteExpiration, cachePolicy.AbsoluteExpirationRelativeToNow, cancellationToken);
return response;
}
}
public class Details
{
// Query definition
public class Query : IRequest<Models.Customer>
{
[FromRoute]
public string CustomerNumber { get; set; }
}
// Simply defining a CachePolicy for ICachePolicy<TRequest,TResponse> sets up caching
// similar to setting up a FluentValidation Validator that inherits from AbstractValidator<TRequest>.
// This could be in the same file or in a separate file, but doesn't clutter up the "Handler".
public class CachePolicy : ICachePolicy<Query, Models.Customer>
{
// Optionally, change defaults
public TimeSpan? AbsoluteExpirationRelativeToNow => TimeSpan.FromMinutes(10);
public TimeSpan? SlidingExpiration => TimeSpan.FromMinutes(1);
// Optionally, provide a different implementation here. By default the CacheKey will be in the following format:
// Query{CustomerNumber:001425}
public string GetCacheKey(Query query)
{
return $"Customers.{query.CustomerNumber}";
}
}
// Query Handler
public class Handler : IRequestHandler<Query, Models.Customer>
{
private readonly CustomerContext _db;
private readonly IConfigurationProvider _configuration;
public Handler(CustomerContext db, IConfigurationProvider configuration)
{
_db = db;
_configuration = configuration;
}
public async Task<Models.Customer> Handle(Query message, CancellationToken token)
{
return await _db.Customers.Where(c => c.Number == message.CustomerNumber)
.ProjectTo<Models.Customer>(_configuration)
.FirstOrDefaultAsync(token);
}
}
}
}
public interface ICache
{
string GetCacheKey(string key);
/// <summary>Gets a value with the given key.</summary>
/// <param name="key">A string identifying the requested value.</param>
/// <param name="token">Optional. The <see cref="T:System.Threading.CancellationToken" /> used to propagate notifications that the operation should be canceled.</param>
/// <returns>The <see cref="T:System.Threading.Tasks.Task" /> that represents the asynchronous operation, containing the located value or null.</returns>
Task<T> GetAsync<T>(string key, CancellationToken token = default);
Task<T> GetOrCreateAsync<T>(
string key,
Func<Task<T>> factory,
CancellationToken token = default);
Task<T> GetOrCreateAsync<T>(
string key,
TimeSpan slidingExpiration,
Func<Task<T>> factory,
CancellationToken token = default);
Task<T> GetOrCreateAsync<T>(
string key,
DateTime absoluteExpiration,
Func<Task<T>> factory,
CancellationToken token = default);
Task<T> GetOrCreateAsync<T>(
string key,
TimeSpan slidingExpiration,
DateTime absoluteExpiration,
Func<Task<T>> factory,
CancellationToken token = default);
Task<T> GetOrCreateAsync<T>(
string key,
TimeSpan? slidingExpiration,
DateTime? absoluteExpiration,
TimeSpan? absoluteExpirationRelativeToNow,
Func<Task<T>> factory,
CancellationToken token = default);
/// <summary>Sets the value with the given key.</summary>
/// <param name="key">A string identifying the requested value.</param>
/// <param name="value">The value to set in the cache.</param>
/// <param name="token">Optional. The <see cref="T:System.Threading.CancellationToken" /> used to propagate notifications that the operation should be canceled.</param>
/// <returns>The <see cref="T:System.Threading.Tasks.Task" /> that represents the asynchronous operation.</returns>
Task SetAsync<T>(
string key,
T value,
CancellationToken token = default);
/// <summary>Sets the value with the given key.</summary>
/// <param name="key">A string identifying the requested value.</param>
/// <param name="value">The value to set in the cache.</param>
/// <param name="absoluteExpiration">The cache options for the value.</param>
/// <param name="token">Optional. The <see cref="T:System.Threading.CancellationToken" /> used to propagate notifications that the operation should be canceled.</param>
/// <returns>The <see cref="T:System.Threading.Tasks.Task" /> that represents the asynchronous operation.</returns>
Task SetAsync<T>(
string key,
T value,
DateTime absoluteExpiration,
CancellationToken token = default);
/// <summary>Sets the value with the given key.</summary>
/// <param name="key">A string identifying the requested value.</param>
/// <param name="value">The value to set in the cache.</param>
/// <param name="slidingExpiration">The cache options for the value.</param>
/// <param name="token">Optional. The <see cref="T:System.Threading.CancellationToken" /> used to propagate notifications that the operation should be canceled.</param>
/// <returns>The <see cref="T:System.Threading.Tasks.Task" /> that represents the asynchronous operation.</returns>
Task SetAsync<T>(
string key,
T value,
TimeSpan slidingExpiration,
CancellationToken token = default);
/// <summary>Sets the value with the given key.</summary>
/// <param name="key">A string identifying the requested value.</param>
/// <param name="value">The value to set in the cache.</param>
/// <param name="slidingExpiration">The cache options for the value.</param>
/// <param name="absoluteExpiration">The cache options for the value.</param>
/// <param name="token">Optional. The <see cref="T:System.Threading.CancellationToken" /> used to propagate notifications that the operation should be canceled.</param>
/// <returns>The <see cref="T:System.Threading.Tasks.Task" /> that represents the asynchronous operation.</returns>
Task SetAsync<T>(
string key,
T value,
TimeSpan? slidingExpiration,
DateTime? absoluteExpiration,
CancellationToken token = default);
/// <summary>Sets the value with the given key.</summary>
/// <param name="key">A string identifying the requested value.</param>
/// <param name="value">The value to set in the cache.</param>
/// <param name="slidingExpiration">The cache options for the value.</param>
/// <param name="absoluteExpirationRelativeToNow">The cache options for the value.</param>
/// <param name="token">Optional. The <see cref="T:System.Threading.CancellationToken" /> used to propagate notifications that the operation should be canceled.</param>
/// <returns>The <see cref="T:System.Threading.Tasks.Task" /> that represents the asynchronous operation.</returns>
Task SetAsync<T>(
string key,
T value,
TimeSpan? slidingExpiration,
TimeSpan? absoluteExpirationRelativeToNow,
CancellationToken token = default);
/// <summary>Sets the value with the given key.</summary>
/// <param name="key">A string identifying the requested value.</param>
/// <param name="value">The value to set in the cache.</param>
/// <param name="slidingExpiration">The cache options for the value.</param>
/// <param name="absoluteExpiration">The cache options for the value.</param>
/// <param name="absoluteExpirationRelativeToNow">The cache options for the value.</param>
/// <param name="token">Optional. The <see cref="T:System.Threading.CancellationToken" /> used to propagate notifications that the operation should be canceled.</param>
/// <returns>The <see cref="T:System.Threading.Tasks.Task" /> that represents the asynchronous operation.</returns>
Task SetAsync<T>(
string key,
T value,
TimeSpan? slidingExpiration,
DateTime? absoluteExpiration,
TimeSpan? absoluteExpirationRelativeToNow,
CancellationToken token = default);
/// <summary>
/// Refreshes a value in the cache based on its key, resetting its sliding expiration timeout (if any).
/// </summary>
/// <param name="key">A string identifying the requested value.</param>
/// <param name="token">Optional. The <see cref="T:System.Threading.CancellationToken" /> used to propagate notifications that the operation should be canceled.</param>
/// <returns>The <see cref="T:System.Threading.Tasks.Task" /> that represents the asynchronous operation.</returns>
Task RefreshAsync(string key, CancellationToken token = default);
/// <summary>Removes the value with the given key.</summary>
/// <param name="key">A string identifying the requested value.</param>
/// <param name="token">Optional. The <see cref="T:System.Threading.CancellationToken" /> used to propagate notifications that the operation should be canceled.</param>
/// <returns>The <see cref="T:System.Threading.Tasks.Task" /> that represents the asynchronous operation.</returns>
Task RemoveAsync(string key, CancellationToken token = default);
}
// Using C# 8.0 to provide a default interface implementation.
// Optionally, could move this to an AbstractCachingPolicy like AbstractValidator.
public interface ICachePolicy<TRequest, TResponse> where TRequest : IRequest<TResponse>
{
DateTime? AbsoluteExpiration => null;
TimeSpan? AbsoluteExpirationRelativeToNow => TimeSpan.FromMinutes(5);
TimeSpan? SlidingExpiration => TimeSpan.FromSeconds(30);
string GetCacheKey(TRequest request)
{
var r = new {request};
var props = r.request.GetType().GetProperties().Select(pi => $"{pi.Name}:{pi.GetValue(r.request, null)}");
return $"{typeof(TRequest).FullName}{{{String.Join(",", props)}}}";
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment