Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Save sam9291/f79f5114ff2abdb0bb9fa948fce50364 to your computer and use it in GitHub Desktop.
Save sam9291/f79f5114ff2abdb0bb9fa948fce50364 to your computer and use it in GitHub Desktop.
Example of custom infinit mass transit retry policy in .net core.
using GreenPipes;
using GreenPipes.Policies;
using MassTransit;
using MassTransit.ConsumeConfigurators;
using MassTransit.Definition;
namespace Consumer.ConsumerDefinitions;
public class InfinitRetryDefinition<T> : ConsumerDefinition<T> where T : class, IConsumer
{
protected override void ConfigureConsumer(
IReceiveEndpointConfigurator endpointConfigurator,
IConsumerConfigurator<T> consumerConfigurator)
{
endpointConfigurator.UseMessageRetry(r =>
{
r.SetRetryPolicy(filter => new InfinitRetryPolicy(filter, TimeSpan.FromSeconds(3)));
});
}
}
public class InfinitRetryPolicy :
IRetryPolicy
{
readonly IExceptionFilter _filter;
public InfinitRetryPolicy(IExceptionFilter filter, TimeSpan interval)
{
_filter = filter;
Interval = interval;
}
public TimeSpan Interval { get; }
void IProbeSite.Probe(ProbeContext context)
{
context.Set(new
{
Policy = "Infinite",
Interval
});
_filter.Probe(context);
}
RetryPolicyContext<T> IRetryPolicy.CreatePolicyContext<T>(T context)
{
return new InfinitRetryPolicyContext<T>(this, context);
}
public bool IsHandled(Exception exception)
{
return _filter.Match(exception);
}
public override string ToString()
{
return $"Infinit";
}
}
public class InfinitRetryPolicyContext<TContext> :
BaseRetryPolicyContext<TContext>
where TContext : class, PipeContext
{
readonly InfinitRetryPolicy _policy;
public InfinitRetryPolicyContext(InfinitRetryPolicy policy, TContext context)
: base(policy, context)
{
_policy = policy;
}
protected override RetryContext<TContext> CreateRetryContext(Exception exception, CancellationToken cancellationToken)
{
return new InfinitRetryContext<TContext>(_policy, Context, exception, 0, cancellationToken);
}
}
public class InfinitRetryContext<TContext> :
BaseRetryContext<TContext>,
RetryContext<TContext>
where TContext : class, PipeContext
{
readonly InfinitRetryPolicy _policy;
public InfinitRetryContext(InfinitRetryPolicy policy, TContext context, Exception exception, int retryCount, CancellationToken cancellationToken)
: base(context, exception, retryCount, cancellationToken)
{
_policy = policy;
}
public override TimeSpan? Delay => _policy.Interval;
bool RetryContext<TContext>.CanRetry(Exception exception, out RetryContext<TContext> retryContext)
{
retryContext = new InfinitRetryContext<TContext>(_policy, Context, Exception, RetryCount + 1, CancellationToken);
return _policy.IsHandled(exception);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment