Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Save jsquire/92a91184fe4e1a0ef22d6b678e8f878b to your computer and use it in GitHub Desktop.
Save jsquire/92a91184fe4e1a0ef22d6b678e8f878b to your computer and use it in GitHub Desktop.

.NET Event Hubs Client: Retry with Timeout Concept (Second Preview)

Because Event Hubs is based on the AMQP protocol, there is no request/response cycle that defines an operation. Both the Event Hubs client and service make use of an operation timeout as means of coordination for managing their resources. When communicating with the Event Hubs service, the client library is required to specify an operation timeout on the AMQP link.

If an operation has not been completed within this timeout period, the client library will consider it a transient failure and apply its retry policy. Likewise, if the server is expecting communication from the client, such as a message acknowledgment, that does not occur within the timeout period, it will consider the operation to have failed.

Operation Timeouts and Cancellations, Oh My!

The guidance for .NET design has been to strongly prefer cancellation tokens with timeout over explicit timeouts. However, this presents a challenge in the case of Event Hubs where the client/service protocol requires a timeout. One challenge is that there is no means to read the remaining time from a CancelleationToken instance. Another is that, in many cases, the timeout value must be supplied at the AMQP level when a client is created rather than when an operation is requested. (nit: this is slightly inaccurate due to lazy creation of AMQP links by the client, but close enough.)

In discussion with the Event Hubs service team, Jeff Richter put forth a proposal to include the operation timeout as part of the retry options, allowing to be specified at the time of client creation. These operation timeouts would act as secondary governance used for the client/service timeout, with cancellation tokens being honored as the primary - allowing callers to remain in control of cancellation.

Concept to Consider for .NET in Track 2

What follows is an informal design concept, based on different conversations with Krzysztof, Jeff, the Event Hubs SDK feature team, and the Event Hubs service team. The approach, names, comments, and other artifacts are intended as an illustration for discussion, not a final design.

RetryOptions with RetryPolicy

public enum RetryMode
{
    Fixed,
    Exponential
}

public class RetryOptions
{
    /// <summary>The retry approach to apply.</summary>
    public RetryMode RetryMode { get; set; } = RetryMode.Exponential
    
    /// <summary>The maximum number of retry attempts to make.</summary>
    public int MaximumRetries { get; set; } 
    
    /// <summary>The delay between retry attempts or the base of an exponential backoff.</summary>
    public TimeSpan Delay { get; set; }
    
    /// <summary>The minimum duration permissible between retry attempts.</summary>
    public TimeSpan MaximumDelay { get; set; }
    
    /// <summary>The maximum duration to wait for completion of a single attempt to perform an Event Hubs service operation.</summary>
    public TimeSpan TryTimeout { get; set; }
}

public abstract class EventHubsRetryPolicy
{
    /// <returns>The delay before retrying; if <c>null</c>, then do not retry.</returns>
    public abstract TimeSpan? CalculateRetryDelay(Exception lastException,
                                                  int retryCount);
}

internal class DefaultRetryPolicy : EventHubsRetryPolicy
{
    public RetryOptions RetryOptions { get; }
    
    EventHubsRetryPolicy(RetryOptions retryOptions) { ... }
    
    public override  TimeSpan? CalculateRetryDelay(Exception lastException,
                                                   int retryCount) { ... }
}

public class EventHubClientOptions
{
    ...
    RetryOptions RetryOptions { get; set; }
    ...
}

public class EventHubClient
{
    ...
    public RetryPolicy RetryPolicy { get; set; }
    ...
}

// Pattern repeats for the EventHubConsumer and EventHubProducer.  RetryOptions may be
// specified as part of the options for each, and a public property allows developers to
// set a custom implementation.
  • This preserves the extension point available in track one and allows developers to offer a custom implementation.

  • This removes exposure of the internal retry policy implementation in favor of an interface (abstract class), which may be defined by Azure.Core; ideally, both the options and the interface could be source from Azure.Core.

  • This allows developers to use a custom policy for advanced scenarios.

  • This requires us to expose the RetryPolicy on the client and allow for read/write semantics; I'd prefer to specify it at the time of creation only. (I don't really have a good, solid justification. Just a preference.)

What did .NET do in Track 1?

public abstract class RetryPolicy
{
    public static bool IsRetryableException(Exception exception) { ... }
    public static RetryPolicy Default => new RetryExponential(DefaultRetryMinBackoff, DefaultRetryMaxBackoff, DefaultRetryMaxCount);
    public static RetryPolicy NoRetry => new RetryExponential(TimeSpan.Zero, TimeSpan.Zero, 0);

    public TimeSpan? GetNextRetryInterval(Exception lastException, 
                                          TimeSpan 
                                          remainingTime, 
                                          int retryCount) { ... }
    
    public abstract RetryPolicy Clone();
    protected abstract TimeSpan? OnGetNextRetryInterval(Exception lastException, 
                                                        TimeSpan remainingTime, 
                                                        int baseWaitTime, 
                                                        int retryCount);
}

public sealed class RetryExponential : RetryPolicy
{
    public RetryExponential(TimeSpan minimumBackoff, TimeSpan maximumBackoff, int maximumRetryCount) { ... }
    
    public override RetryPolicy Clone() { ... }
    protected override TimeSpan? OnGetNextRetryInterval(Exception lastException, 
                                                        TimeSpan remainingTime, 
                                                        int baseWaitTimeSecs, 
                                                        int retryCount)  { ... }
}

public abstract class EventHubClient 
{
    // Retry policy is set to the default on creation with no option to
    // change or override values.  It must be set on the client after
    // construction.
    ...
    public RetryPolicy RetryPolicy { get; set; }
    ...
}

// Pattern repeats for the PartitionSender and PartitionReceiver.  Retry is defaulted at 
// construction and a public property allows developers to set new values or provide a 
// custom implementation.
//
// The options for each client would no longer offer a DefaultTimeout property; these would be replaced 
// by the member of the retry policy and other timeout scenarios would be communicated via the Cancellation Token.

What do the other languages do in Track 2?

The current implementations for track two of the other languages do not yet include the operation timeout on the retry policy. To my knowledge, there are no planned changes to the retry approach outside of adding that property.

Java

public abstract class Retry {
    public static final Duration DEFAULT_RETRY_MIN_BACKOFF = Duration.ofSeconds(0);
    public static final Duration DEFAULT_RETRY_MAX_BACKOFF = Duration.ofSeconds(30);
    public static final int DEFAULT_MAX_RETRY_COUNT = 10;
    private static final int SERVER_BUSY_BASE_SLEEP_TIME_IN_SECS = 4;
    
    public Retry(int maxRetryCount) { ... }

    public static boolean isRetriableException(Exception exception) { ... }
    public static Retry getNoRetry() { ... }
    public static Retry getDefaultRetry() { ... } 
    
    public int incrementRetryCount() { ... }
    public int getRetryCount() { ... } 
    public void resetRetryInterval() { ... } 
    public int getMaxRetryCount() { ... } 
    public Duration getNextRetryInterval(Exception lastException, Duration remainingTime) { ... }
    
    protected abstract Duration calculateNextRetryInterval(Exception lastException, Duration remainingTime,
                                                           int baseWaitSeconds, int retryCount);
}

public class EventHubClientBuilder {
    // Contains static methods for building an EventHubClient.  The options
    // for each client are taken as loose arguments, including the
    // retry policy and operation timeout.
    
    private TokenCredential credentials;
    private Configuration configuration;
    private Duration timeout;
    private ProxyConfiguration proxyConfiguration;
    private Retry retry;
    private Scheduler scheduler;
    private TransportType transport;
    private String host;
    private String eventHubPath;
    ...
}

// Pattern repeats for the EventHubProducer and EventHubConsumer.  Retry is provided or set
// as part of the options for the client type.  It is then read-only.

TypeScript

export interface RetryOptions {
  retryCount?: number;
  retryInterval?: number;
  maxRetryInterval?: number;
  isExponential?: boolean;
}

export interface EventHubClientOptions {
  ...
  retryOptions?: RetryOptions;
  ...
}

// Retry policy may be specified as part of the options for each client 
// (EventHubClient, EventHubProducer, EventHubConsumer), and are read-only after construction.
//
// TypeScript isn't currently allowing operation timeouts to be set by callers.  Callers may
// pass an abortSignal as a cancellation source, which allows a timeout duration after which 
// cancellation is requested.

Python

client = EventHubClient(
    host=HOSTNAME, 
    event_hub_path=EVENT_HUB, 
    credential=EventHubSharedKeyCredential(USER, KEY),
    max_retries=3,
    receive_timeout=0.50,
    send_timeout=0.50)
   
# Retries are fixed only; timeouts are specified on the client.
# There is no concept of cancellation, timeouts are used instead.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment