Skip to content

Instantly share code, notes, and snippets.

@bpatra
Created December 5, 2020 22:44
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save bpatra/d9939abc7e2bad936493955cb93066c3 to your computer and use it in GitHub Desktop.
Save bpatra/d9939abc7e2bad936493955cb93066c3 to your computer and use it in GitHub Desktop.
public class DocumentClientWrapped : IDocumentClientWrapped
{
private class RetriableEnumerable<T> : IEnumerable<T>
{
private readonly IEnumerable<T> _t;
public RetriableEnumerable(IEnumerable<T> t)
{
_t = t;
}
public IEnumerator<T> GetEnumerator()
{
return new RetriableEnumerator<T>(ExecuteWithRetry(() => _t.GetEnumerator()));
}
IEnumerator IEnumerable.GetEnumerator()
{
return this.GetEnumerator();
}
}
private class RetriableEnumerator<T> : IEnumerator<T>
{
private IEnumerator<T> _t;
public RetriableEnumerator(IEnumerator<T> t)
{
_t = t;
}
public T Current
{
get
{
return ExecuteWithRetry(()=> _t.Current);
}
}
object IEnumerator.Current
{
get
{
return ExecuteWithRetry(() => _t.Current);
}
}
public void Dispose()
{
_t.Dispose();
}
public bool MoveNext()
{
return ExecuteWithRetry(() => _t.MoveNext());
}
public void Reset()
{
_t.Reset();
}
}
private class RetryQueryable<T> : IRetryQueryable<T>
{
private readonly IQueryable<T> _queryable;
public RetryQueryable(IQueryable<T> queryable)
{
_queryable = queryable;
}
public IDocumentQuery<T> AsDocumentQuery()
{
return this._queryable.AsDocumentQuery();
}
public IEnumerable<T> AsRetryEnumerable()
{
return new RetriableEnumerable<T>(this._queryable.AsEnumerable());
}
public IRetryQueryable<T> Where(Expression<Func<T, bool>> predicate)
{
var queryable = this._queryable.Where(predicate);
return new RetryQueryable<T>(queryable);
}
public IRetryQueryable<TResult> SelectMany<TResult>(Expression<Func<T, IEnumerable<TResult>>> predicate)
{
var queryable = this._queryable.SelectMany(predicate);
return new RetryQueryable<TResult>(queryable);
}
public IRetryQueryable<TResult> Select<TResult>(Expression<Func<T, TResult>> predicate)
{
var queryable = this._queryable.Select(predicate);
return new RetryQueryable<TResult>(queryable);
}
}
private const int MaxRetryCount = 10;
private static async Task<V> ExecuteWithRetriesAsync<V>(Func<Task<V>> function)
{
TimeSpan sleepTime = TimeSpan.FromSeconds(1.0);
int count = 0;
while (true)
{
try
{
return await function();
}
catch (DocumentClientException de)
{
if ((int)de.StatusCode != 429)
{
throw;
}
if (++count > MaxRetryCount)
{
throw new MaxRetryException(de, count);
}
Trace.TraceInformation("DocumentDB async retry count: {0} - retry in: {1} - RUs: {2}", ++count, sleepTime.TotalMilliseconds, de.RequestCharge);
sleepTime = de.RetryAfter;
}
catch (AggregateException ae)
{
if (!(ae.InnerException is DocumentClientException))
{
throw;
}
DocumentClientException de = (DocumentClientException)ae.InnerException;
if ((int)de.StatusCode != 429)
{
throw;
}
if (++count > MaxRetryCount)
{
throw new MaxRetryException(de, count);
}
Trace.TraceInformation("DocumentDB async retry count: {0} - retry in: {1} - RUs: {2}", ++count, sleepTime.TotalMilliseconds, de.RequestCharge);
sleepTime = de.RetryAfter;
}
await Task.Delay(sleepTime);
}
}
private static V ExecuteWithRetry<V>(Func<V> function)
{
TimeSpan sleepTime = TimeSpan.FromSeconds(1.0);
int count = 0;
while (true)
{
try
{
return function();
}
catch (DocumentClientException de)
{
if ((int)de.StatusCode != 429)
{
throw;
}
if (++count > MaxRetryCount)
{
throw new MaxRetryException(de, count);
}
Trace.TraceInformation("DocumentDB sync retry count: {0} - retry in: {1} - RUs: {2}", ++count, sleepTime.TotalMilliseconds, de.RequestCharge);
sleepTime = de.RetryAfter;
}
catch (AggregateException ae)
{
if (!(ae.InnerException is DocumentClientException))
{
throw;
}
DocumentClientException de = (DocumentClientException)ae.InnerException;
if ((int)de.StatusCode != 429)
{
throw;
}
if (++count > MaxRetryCount)
{
throw new MaxRetryException(de, count);
}
Trace.TraceInformation("DocumentDB sync retry count: {0} - retry in: {1} - RUs: {2}", ++count, sleepTime.TotalMilliseconds, de.RequestCharge);
sleepTime = de.RetryAfter;
}
Thread.Sleep(sleepTime);
}
}
private readonly DocumentClient _client;
public DocumentClientWrapped(string endpointUrl, string authorizationKey)
{
_client = new DocumentClient(new Uri(endpointUrl), authorizationKey);
}
public async Task CreateDocumentAsync(Uri documentCollectionUri, object obj)
{
ResourceResponse<Document> response = await ExecuteWithRetriesAsync(()=> _client.CreateDocumentAsync(documentCollectionUri, obj));
LogResponse<Document>("CreateDocumentAsync", response);/* Do something with the response if you want to use it */
}
public async Task DeleteDocumentAsync(Uri documentUri)
{
ResourceResponse<Document> response = await ExecuteWithRetriesAsync( () => _client.DeleteDocumentAsync(documentUri));
LogResponse<Document>("DeleteDocumentAsync", response); /* Do something with the response if you want to use it */
}
public async Task ReplaceDocumentAsync(Uri documentCollectionUri, object document)
{
var response = await ExecuteWithRetriesAsync(()=> _client.ReplaceDocumentAsync(documentCollectionUri, document));
LogResponse<Document>("ReplaceDocumentAsync", response);
}
public IRetryQueryable<T> CreateDocumentQuery<T>(Uri documentCollectionUri, SqlQuerySpec sqlSpec)
{
var queryable = _client.CreateDocumentQuery<T>(documentCollectionUri, sqlSpec);
return new RetryQueryable<T>(queryable);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment