Skip to content

Instantly share code, notes, and snippets.

@joelverhagen
Last active October 26, 2023 19:53
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save joelverhagen/bbf0bdd91cfcdb5784abf135a859a108 to your computer and use it in GitHub Desktop.
Save joelverhagen/bbf0bdd91cfcdb5784abf135a859a108 to your computer and use it in GitHub Desktop.
Retry conflicts with Azure SDK
Table go!
Entity go!
Update with If-Match go!
Fake error! (ServiceUnavailable)
Success!
using System.Diagnostics.CodeAnalysis;
using System.Net;
using System.Text.Json;
using Azure;
using Azure.Core;
using Azure.Core.Pipeline;
using Azure.Data.Tables;
var messageHandler = new TestHandler { InnerHandler = new SocketsHttpHandler() };
var serviceClient = new TableServiceClient(
"UseDevelopmentStorage=true",
new TableClientOptions { Transport = new HttpClientTransport(messageHandler), RetryPolicy = new MyRetryPolicy() });
var table = serviceClient.GetTableClient("oopsie");
// setup
Console.WriteLine("Table go!");
await table.CreateIfNotExistsAsync();
Console.WriteLine("Entity go!");
var entity = new TableEntity("pk", "rk");
entity["State"] = "Active";
entity["HitCount"] = 1;
var response = await table.UpsertEntityAsync(entity, TableUpdateMode.Replace);
entity.ETag = response.Headers.ETag!.Value;
// repro
Console.WriteLine("Update with If-Match go!");
entity["HitCount"] = 2;
try
{
var properties = new Dictionary<string, object?>
{
{ "TableContext", new TableContext(table, entity) },
};
using (var context = HttpPipeline.CreateHttpMessagePropertiesScope(properties))
{
await table.UpdateEntityAsync(entity, entity.ETag, TableUpdateMode.Replace);
}
Console.WriteLine("Success!");
}
catch (RequestFailedException ex) when (ex.Status == 412)
{
Console.WriteLine($"Oopsie. {ex.Status} {ex.ErrorCode}.");
}
record TableContext(TableClient Table, ITableEntity Entity);
class MyRetryPolicy : RetryPolicy
{
protected override async ValueTask<bool> ShouldRetryAsync(HttpMessage message, Exception? exception)
{
var shouldRetry = await base.ShouldRetryAsync(message, exception);
var statusCode = message.Response?.Status;
HttpStatusCode? conflictStatusCode = null;
if (message.Request.Method == RequestMethod.Post)
{
conflictStatusCode = HttpStatusCode.Conflict;
}
else if (message.Request.Method == RequestMethod.Put)
{
conflictStatusCode = HttpStatusCode.PreconditionFailed;
}
if (!shouldRetry
&& conflictStatusCode.HasValue
&& statusCode == (int)conflictStatusCode.Value
&& message.Request.Content is not null
&& message.TryGetProperty("TableContext", out var contextObj)
&& contextObj is TableContext context)
{
using var requestContent = new MemoryStream();
// this copies the bytes -- would be nice to be able to "try get internal buffer" as a fast path
await message.Request.Content.WriteToAsync(requestContent, message.CancellationToken);
var bytes = new ReadOnlyMemory<byte>(requestContent.GetBuffer(), 0, (int)requestContent.Length);
var binaryData = new BinaryData(bytes);
var local = binaryData.ToObjectFromJson<Dictionary<string, JsonElement>>();
// does not work for batch requests
var diffResponse = await context.Table.GetEntityAsync<TableEntity>(context.Entity.PartitionKey, context.Entity.RowKey);
var rawResponse = diffResponse.GetRawResponse();
var remote = rawResponse.Content.ToObjectFromJson<Dictionary<string, JsonElement>>();
if (!HasEntityChanged(remote, local))
{
message.Response = new FakeResponse((int)HttpStatusCode.NoContent, message.Response!.ClientRequestId);
return false;
}
}
return shouldRetry;
}
private class FakeResponse : Response
{
public FakeResponse(int status, string clientRequestId)
{
Status = status;
ClientRequestId = clientRequestId;
}
public override int Status { get; }
public override string ReasonPhrase { get; } = string.Empty;
public override Stream? ContentStream { get; set; }
public override string ClientRequestId { get; set; }
public override void Dispose()
{
}
protected override bool ContainsHeader(string name)
{
return false;
}
protected override IEnumerable<HttpHeader> EnumerateHeaders()
{
return Enumerable.Empty<HttpHeader>();
}
protected override bool TryGetHeader(string name, [NotNullWhen(true)] out string? value)
{
value = null;
return false;
}
protected override bool TryGetHeaderValues(string name, [NotNullWhen(true)] out IEnumerable<string>? values)
{
values = null;
return false;
}
}
bool HasEntityChanged(Dictionary<string, JsonElement> remote, Dictionary<string, JsonElement> local)
{
var keys = local.Keys.Union(remote.Keys).Except(new[] { "odata.etag", "odata.metadata", "Timestamp" });
foreach (var key in keys)
{
if (!local.TryGetValue(key, out var localValue)
|| !remote.TryGetValue(key, out var remoteValue)
|| localValue.ValueKind != remoteValue.ValueKind)
{
return true;
}
switch (localValue.ValueKind)
{
case JsonValueKind.String:
case JsonValueKind.Number:
if (localValue.GetRawText() != remoteValue.GetRawText())
{
return true;
}
break;
case JsonValueKind.True:
case JsonValueKind.False:
case JsonValueKind.Undefined:
case JsonValueKind.Null:
break;
default:
throw new NotImplementedException();
}
}
return false;
}
}
class TestHandler : DelegatingHandler
{
private int _requestCount;
protected override async Task<HttpResponseMessage> SendAsync(HttpRequestMessage request, CancellationToken cancellationToken)
{
var response = await base.SendAsync(request, cancellationToken);
if (request.Method == HttpMethod.Put && request.RequestUri!.AbsolutePath.EndsWith("(PartitionKey='pk',RowKey='rk')"))
{
if (Interlocked.Increment(ref _requestCount) == 2)
{
var statusCode = HttpStatusCode.ServiceUnavailable;
Console.WriteLine($"Fake error! ({statusCode})");
return new HttpResponseMessage(statusCode) { RequestMessage = request };
}
}
return response;
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment