Created
December 5, 2020 22:44
-
-
Save bpatra/d9939abc7e2bad936493955cb93066c3 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public class DocumentClientWrapped : IDocumentClientWrapped | |
{ | |
private class RetriableEnumerable<T> : IEnumerable<T> | |
{ | |
private readonly IEnumerable<T> _t; | |
public RetriableEnumerable(IEnumerable<T> t) | |
{ | |
_t = t; | |
} | |
public IEnumerator<T> GetEnumerator() | |
{ | |
return new RetriableEnumerator<T>(ExecuteWithRetry(() => _t.GetEnumerator())); | |
} | |
IEnumerator IEnumerable.GetEnumerator() | |
{ | |
return this.GetEnumerator(); | |
} | |
} | |
private class RetriableEnumerator<T> : IEnumerator<T> | |
{ | |
private IEnumerator<T> _t; | |
public RetriableEnumerator(IEnumerator<T> t) | |
{ | |
_t = t; | |
} | |
public T Current | |
{ | |
get | |
{ | |
return ExecuteWithRetry(()=> _t.Current); | |
} | |
} | |
object IEnumerator.Current | |
{ | |
get | |
{ | |
return ExecuteWithRetry(() => _t.Current); | |
} | |
} | |
public void Dispose() | |
{ | |
_t.Dispose(); | |
} | |
public bool MoveNext() | |
{ | |
return ExecuteWithRetry(() => _t.MoveNext()); | |
} | |
public void Reset() | |
{ | |
_t.Reset(); | |
} | |
} | |
private class RetryQueryable<T> : IRetryQueryable<T> | |
{ | |
private readonly IQueryable<T> _queryable; | |
public RetryQueryable(IQueryable<T> queryable) | |
{ | |
_queryable = queryable; | |
} | |
public IDocumentQuery<T> AsDocumentQuery() | |
{ | |
return this._queryable.AsDocumentQuery(); | |
} | |
public IEnumerable<T> AsRetryEnumerable() | |
{ | |
return new RetriableEnumerable<T>(this._queryable.AsEnumerable()); | |
} | |
public IRetryQueryable<T> Where(Expression<Func<T, bool>> predicate) | |
{ | |
var queryable = this._queryable.Where(predicate); | |
return new RetryQueryable<T>(queryable); | |
} | |
public IRetryQueryable<TResult> SelectMany<TResult>(Expression<Func<T, IEnumerable<TResult>>> predicate) | |
{ | |
var queryable = this._queryable.SelectMany(predicate); | |
return new RetryQueryable<TResult>(queryable); | |
} | |
public IRetryQueryable<TResult> Select<TResult>(Expression<Func<T, TResult>> predicate) | |
{ | |
var queryable = this._queryable.Select(predicate); | |
return new RetryQueryable<TResult>(queryable); | |
} | |
} | |
private const int MaxRetryCount = 10; | |
private static async Task<V> ExecuteWithRetriesAsync<V>(Func<Task<V>> function) | |
{ | |
TimeSpan sleepTime = TimeSpan.FromSeconds(1.0); | |
int count = 0; | |
while (true) | |
{ | |
try | |
{ | |
return await function(); | |
} | |
catch (DocumentClientException de) | |
{ | |
if ((int)de.StatusCode != 429) | |
{ | |
throw; | |
} | |
if (++count > MaxRetryCount) | |
{ | |
throw new MaxRetryException(de, count); | |
} | |
Trace.TraceInformation("DocumentDB async retry count: {0} - retry in: {1} - RUs: {2}", ++count, sleepTime.TotalMilliseconds, de.RequestCharge); | |
sleepTime = de.RetryAfter; | |
} | |
catch (AggregateException ae) | |
{ | |
if (!(ae.InnerException is DocumentClientException)) | |
{ | |
throw; | |
} | |
DocumentClientException de = (DocumentClientException)ae.InnerException; | |
if ((int)de.StatusCode != 429) | |
{ | |
throw; | |
} | |
if (++count > MaxRetryCount) | |
{ | |
throw new MaxRetryException(de, count); | |
} | |
Trace.TraceInformation("DocumentDB async retry count: {0} - retry in: {1} - RUs: {2}", ++count, sleepTime.TotalMilliseconds, de.RequestCharge); | |
sleepTime = de.RetryAfter; | |
} | |
await Task.Delay(sleepTime); | |
} | |
} | |
private static V ExecuteWithRetry<V>(Func<V> function) | |
{ | |
TimeSpan sleepTime = TimeSpan.FromSeconds(1.0); | |
int count = 0; | |
while (true) | |
{ | |
try | |
{ | |
return function(); | |
} | |
catch (DocumentClientException de) | |
{ | |
if ((int)de.StatusCode != 429) | |
{ | |
throw; | |
} | |
if (++count > MaxRetryCount) | |
{ | |
throw new MaxRetryException(de, count); | |
} | |
Trace.TraceInformation("DocumentDB sync retry count: {0} - retry in: {1} - RUs: {2}", ++count, sleepTime.TotalMilliseconds, de.RequestCharge); | |
sleepTime = de.RetryAfter; | |
} | |
catch (AggregateException ae) | |
{ | |
if (!(ae.InnerException is DocumentClientException)) | |
{ | |
throw; | |
} | |
DocumentClientException de = (DocumentClientException)ae.InnerException; | |
if ((int)de.StatusCode != 429) | |
{ | |
throw; | |
} | |
if (++count > MaxRetryCount) | |
{ | |
throw new MaxRetryException(de, count); | |
} | |
Trace.TraceInformation("DocumentDB sync retry count: {0} - retry in: {1} - RUs: {2}", ++count, sleepTime.TotalMilliseconds, de.RequestCharge); | |
sleepTime = de.RetryAfter; | |
} | |
Thread.Sleep(sleepTime); | |
} | |
} | |
private readonly DocumentClient _client; | |
public DocumentClientWrapped(string endpointUrl, string authorizationKey) | |
{ | |
_client = new DocumentClient(new Uri(endpointUrl), authorizationKey); | |
} | |
public async Task CreateDocumentAsync(Uri documentCollectionUri, object obj) | |
{ | |
ResourceResponse<Document> response = await ExecuteWithRetriesAsync(()=> _client.CreateDocumentAsync(documentCollectionUri, obj)); | |
LogResponse<Document>("CreateDocumentAsync", response);/* Do something with the response if you want to use it */ | |
} | |
public async Task DeleteDocumentAsync(Uri documentUri) | |
{ | |
ResourceResponse<Document> response = await ExecuteWithRetriesAsync( () => _client.DeleteDocumentAsync(documentUri)); | |
LogResponse<Document>("DeleteDocumentAsync", response); /* Do something with the response if you want to use it */ | |
} | |
public async Task ReplaceDocumentAsync(Uri documentCollectionUri, object document) | |
{ | |
var response = await ExecuteWithRetriesAsync(()=> _client.ReplaceDocumentAsync(documentCollectionUri, document)); | |
LogResponse<Document>("ReplaceDocumentAsync", response); | |
} | |
public IRetryQueryable<T> CreateDocumentQuery<T>(Uri documentCollectionUri, SqlQuerySpec sqlSpec) | |
{ | |
var queryable = _client.CreateDocumentQuery<T>(documentCollectionUri, sqlSpec); | |
return new RetryQueryable<T>(queryable); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment