Created
June 27, 2023 18:08
-
-
Save sam9291/f79f5114ff2abdb0bb9fa948fce50364 to your computer and use it in GitHub Desktop.
Example of custom infinit mass transit retry policy in .net core.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using GreenPipes; | |
using GreenPipes.Policies; | |
using MassTransit; | |
using MassTransit.ConsumeConfigurators; | |
using MassTransit.Definition; | |
namespace Consumer.ConsumerDefinitions; | |
public class InfinitRetryDefinition<T> : ConsumerDefinition<T> where T : class, IConsumer | |
{ | |
protected override void ConfigureConsumer( | |
IReceiveEndpointConfigurator endpointConfigurator, | |
IConsumerConfigurator<T> consumerConfigurator) | |
{ | |
endpointConfigurator.UseMessageRetry(r => | |
{ | |
r.SetRetryPolicy(filter => new InfinitRetryPolicy(filter, TimeSpan.FromSeconds(3))); | |
}); | |
} | |
} | |
public class InfinitRetryPolicy : | |
IRetryPolicy | |
{ | |
readonly IExceptionFilter _filter; | |
public InfinitRetryPolicy(IExceptionFilter filter, TimeSpan interval) | |
{ | |
_filter = filter; | |
Interval = interval; | |
} | |
public TimeSpan Interval { get; } | |
void IProbeSite.Probe(ProbeContext context) | |
{ | |
context.Set(new | |
{ | |
Policy = "Infinite", | |
Interval | |
}); | |
_filter.Probe(context); | |
} | |
RetryPolicyContext<T> IRetryPolicy.CreatePolicyContext<T>(T context) | |
{ | |
return new InfinitRetryPolicyContext<T>(this, context); | |
} | |
public bool IsHandled(Exception exception) | |
{ | |
return _filter.Match(exception); | |
} | |
public override string ToString() | |
{ | |
return $"Infinit"; | |
} | |
} | |
public class InfinitRetryPolicyContext<TContext> : | |
BaseRetryPolicyContext<TContext> | |
where TContext : class, PipeContext | |
{ | |
readonly InfinitRetryPolicy _policy; | |
public InfinitRetryPolicyContext(InfinitRetryPolicy policy, TContext context) | |
: base(policy, context) | |
{ | |
_policy = policy; | |
} | |
protected override RetryContext<TContext> CreateRetryContext(Exception exception, CancellationToken cancellationToken) | |
{ | |
return new InfinitRetryContext<TContext>(_policy, Context, exception, 0, cancellationToken); | |
} | |
} | |
public class InfinitRetryContext<TContext> : | |
BaseRetryContext<TContext>, | |
RetryContext<TContext> | |
where TContext : class, PipeContext | |
{ | |
readonly InfinitRetryPolicy _policy; | |
public InfinitRetryContext(InfinitRetryPolicy policy, TContext context, Exception exception, int retryCount, CancellationToken cancellationToken) | |
: base(context, exception, retryCount, cancellationToken) | |
{ | |
_policy = policy; | |
} | |
public override TimeSpan? Delay => _policy.Interval; | |
bool RetryContext<TContext>.CanRetry(Exception exception, out RetryContext<TContext> retryContext) | |
{ | |
retryContext = new InfinitRetryContext<TContext>(_policy, Context, Exception, RetryCount + 1, CancellationToken); | |
return _policy.IsHandled(exception); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment