Last active
October 26, 2023 19:53
-
-
Save joelverhagen/bbf0bdd91cfcdb5784abf135a859a108 to your computer and use it in GitHub Desktop.
Retry conflicts with Azure SDK
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Table go! | |
Entity go! | |
Update with If-Match go! | |
Fake error! (ServiceUnavailable) | |
Success! |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System.Diagnostics.CodeAnalysis; | |
using System.Net; | |
using System.Text.Json; | |
using Azure; | |
using Azure.Core; | |
using Azure.Core.Pipeline; | |
using Azure.Data.Tables; | |
var messageHandler = new TestHandler { InnerHandler = new SocketsHttpHandler() }; | |
var serviceClient = new TableServiceClient( | |
"UseDevelopmentStorage=true", | |
new TableClientOptions { Transport = new HttpClientTransport(messageHandler), RetryPolicy = new MyRetryPolicy() }); | |
var table = serviceClient.GetTableClient("oopsie"); | |
// setup | |
Console.WriteLine("Table go!"); | |
await table.CreateIfNotExistsAsync(); | |
Console.WriteLine("Entity go!"); | |
var entity = new TableEntity("pk", "rk"); | |
entity["State"] = "Active"; | |
entity["HitCount"] = 1; | |
var response = await table.UpsertEntityAsync(entity, TableUpdateMode.Replace); | |
entity.ETag = response.Headers.ETag!.Value; | |
// repro | |
Console.WriteLine("Update with If-Match go!"); | |
entity["HitCount"] = 2; | |
try | |
{ | |
var properties = new Dictionary<string, object?> | |
{ | |
{ "TableContext", new TableContext(table, entity) }, | |
}; | |
using (var context = HttpPipeline.CreateHttpMessagePropertiesScope(properties)) | |
{ | |
await table.UpdateEntityAsync(entity, entity.ETag, TableUpdateMode.Replace); | |
} | |
Console.WriteLine("Success!"); | |
} | |
catch (RequestFailedException ex) when (ex.Status == 412) | |
{ | |
Console.WriteLine($"Oopsie. {ex.Status} {ex.ErrorCode}."); | |
} | |
record TableContext(TableClient Table, ITableEntity Entity); | |
class MyRetryPolicy : RetryPolicy | |
{ | |
protected override async ValueTask<bool> ShouldRetryAsync(HttpMessage message, Exception? exception) | |
{ | |
var shouldRetry = await base.ShouldRetryAsync(message, exception); | |
var statusCode = message.Response?.Status; | |
HttpStatusCode? conflictStatusCode = null; | |
if (message.Request.Method == RequestMethod.Post) | |
{ | |
conflictStatusCode = HttpStatusCode.Conflict; | |
} | |
else if (message.Request.Method == RequestMethod.Put) | |
{ | |
conflictStatusCode = HttpStatusCode.PreconditionFailed; | |
} | |
if (!shouldRetry | |
&& conflictStatusCode.HasValue | |
&& statusCode == (int)conflictStatusCode.Value | |
&& message.Request.Content is not null | |
&& message.TryGetProperty("TableContext", out var contextObj) | |
&& contextObj is TableContext context) | |
{ | |
using var requestContent = new MemoryStream(); | |
// this copies the bytes -- would be nice to be able to "try get internal buffer" as a fast path | |
await message.Request.Content.WriteToAsync(requestContent, message.CancellationToken); | |
var bytes = new ReadOnlyMemory<byte>(requestContent.GetBuffer(), 0, (int)requestContent.Length); | |
var binaryData = new BinaryData(bytes); | |
var local = binaryData.ToObjectFromJson<Dictionary<string, JsonElement>>(); | |
// does not work for batch requests | |
var diffResponse = await context.Table.GetEntityAsync<TableEntity>(context.Entity.PartitionKey, context.Entity.RowKey); | |
var rawResponse = diffResponse.GetRawResponse(); | |
var remote = rawResponse.Content.ToObjectFromJson<Dictionary<string, JsonElement>>(); | |
if (!HasEntityChanged(remote, local)) | |
{ | |
message.Response = new FakeResponse((int)HttpStatusCode.NoContent, message.Response!.ClientRequestId); | |
return false; | |
} | |
} | |
return shouldRetry; | |
} | |
private class FakeResponse : Response | |
{ | |
public FakeResponse(int status, string clientRequestId) | |
{ | |
Status = status; | |
ClientRequestId = clientRequestId; | |
} | |
public override int Status { get; } | |
public override string ReasonPhrase { get; } = string.Empty; | |
public override Stream? ContentStream { get; set; } | |
public override string ClientRequestId { get; set; } | |
public override void Dispose() | |
{ | |
} | |
protected override bool ContainsHeader(string name) | |
{ | |
return false; | |
} | |
protected override IEnumerable<HttpHeader> EnumerateHeaders() | |
{ | |
return Enumerable.Empty<HttpHeader>(); | |
} | |
protected override bool TryGetHeader(string name, [NotNullWhen(true)] out string? value) | |
{ | |
value = null; | |
return false; | |
} | |
protected override bool TryGetHeaderValues(string name, [NotNullWhen(true)] out IEnumerable<string>? values) | |
{ | |
values = null; | |
return false; | |
} | |
} | |
bool HasEntityChanged(Dictionary<string, JsonElement> remote, Dictionary<string, JsonElement> local) | |
{ | |
var keys = local.Keys.Union(remote.Keys).Except(new[] { "odata.etag", "odata.metadata", "Timestamp" }); | |
foreach (var key in keys) | |
{ | |
if (!local.TryGetValue(key, out var localValue) | |
|| !remote.TryGetValue(key, out var remoteValue) | |
|| localValue.ValueKind != remoteValue.ValueKind) | |
{ | |
return true; | |
} | |
switch (localValue.ValueKind) | |
{ | |
case JsonValueKind.String: | |
case JsonValueKind.Number: | |
if (localValue.GetRawText() != remoteValue.GetRawText()) | |
{ | |
return true; | |
} | |
break; | |
case JsonValueKind.True: | |
case JsonValueKind.False: | |
case JsonValueKind.Undefined: | |
case JsonValueKind.Null: | |
break; | |
default: | |
throw new NotImplementedException(); | |
} | |
} | |
return false; | |
} | |
} | |
class TestHandler : DelegatingHandler | |
{ | |
private int _requestCount; | |
protected override async Task<HttpResponseMessage> SendAsync(HttpRequestMessage request, CancellationToken cancellationToken) | |
{ | |
var response = await base.SendAsync(request, cancellationToken); | |
if (request.Method == HttpMethod.Put && request.RequestUri!.AbsolutePath.EndsWith("(PartitionKey='pk',RowKey='rk')")) | |
{ | |
if (Interlocked.Increment(ref _requestCount) == 2) | |
{ | |
var statusCode = HttpStatusCode.ServiceUnavailable; | |
Console.WriteLine($"Fake error! ({statusCode})"); | |
return new HttpResponseMessage(statusCode) { RequestMessage = request }; | |
} | |
} | |
return response; | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment