Skip to content

Instantly share code, notes, and snippets.

@rschiefer
Created August 14, 2020 15:58
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save rschiefer/fec2684a428efb089ba399e7390600d0 to your computer and use it in GitHub Desktop.
Save rschiefer/fec2684a428efb089ba399e7390600d0 to your computer and use it in GitHub Desktop.
Retry OData Client calls with Polly in .NET 4.5
public class HttpClientRequestMessage : DataServiceClientRequestMessage
{
private HttpRequestMessage requestMessage;
private readonly HttpClient client;
private readonly HttpClientHandler handler;
private readonly MemoryStream messageStream;
private readonly Dictionary<string, string> contentHeaderValueCache;
private readonly RetryPolicy<HttpResponseMessage> pollyPolicy;
public HttpClientRequestMessage(string actualMethod, RetryPolicy<HttpResponseMessage> pollyPolicy)
: base(actualMethod)
{
this.requestMessage = new HttpRequestMessage();
this.messageStream = new MemoryStream();
this.handler = new HttpClientHandler();
this.client = new HttpClient(this.handler, disposeHandler: true);
this.contentHeaderValueCache = new Dictionary<string, string>();
this.pollyPolicy = pollyPolicy;
}
public override IEnumerable<KeyValuePair<string, string>> Headers
{
get
{
if (this.requestMessage.Content != null)
{
return HttpHeadersToStringDictionary(this.requestMessage.Headers).Concat(HttpHeadersToStringDictionary(this.requestMessage.Content.Headers));
}
return HttpHeadersToStringDictionary(this.requestMessage.Headers).Concat(this.contentHeaderValueCache);
}
}
public override Uri Url
{
get { return requestMessage.RequestUri; }
set { requestMessage.RequestUri = value; }
}
public override string Method
{
get { return this.requestMessage.Method.ToString(); }
set { this.requestMessage.Method = new HttpMethod(value); }
}
public override ICredentials Credentials
{
get { return this.handler.Credentials; }
set { this.handler.Credentials = value; }
}
public override int Timeout
{
get { return (int)this.client.Timeout.TotalSeconds; }
set { this.client.Timeout = new TimeSpan(0, 0, value); }
}
/// <summary>
/// Gets or sets a value that indicates whether to send data in segments to the Internet resource.
/// </summary>
public override bool SendChunked
{
get
{
bool? transferEncodingChunked = this.requestMessage.Headers.TransferEncodingChunked;
return transferEncodingChunked.HasValue && transferEncodingChunked.Value;
}
set { this.requestMessage.Headers.TransferEncodingChunked = value; }
}
public override string GetHeader(string headerName)
{
return this.requestMessage.Headers.GetValues(headerName).FirstOrDefault();
}
public override void SetHeader(string headerName, string headerValue)
{
this.requestMessage.Headers.Add(headerName, headerValue);
}
public override Stream GetStream()
{
return this.messageStream;
}
/// <summary>
/// Abort the current request.
/// </summary>
public override void Abort()
{
this.client.CancelPendingRequests();
}
public override IAsyncResult BeginGetRequestStream(AsyncCallback callback, object state)
{
var taskCompletionSource = new TaskCompletionSource<Stream>();
taskCompletionSource.TrySetResult(this.messageStream);
return taskCompletionSource.Task.ToApm(callback, state);
}
public override Stream EndGetRequestStream(IAsyncResult asyncResult)
{
return ((Task<Stream>)asyncResult).Result;
}
public override IAsyncResult BeginGetResponse(AsyncCallback callback, object state)
{
var send = CreateSendTask();
return send.ToApm(callback, state);
}
public override Microsoft.OData.IODataResponseMessage EndGetResponse(IAsyncResult asyncResult)
{
var result = ((Task<HttpResponseMessage>)asyncResult).Result;
return ConvertHttpClientResponse(result);
}
public override Microsoft.OData.IODataResponseMessage GetResponse()
{
var send = CreateSendTask();
send.Wait();
return ConvertHttpClientResponse(send.Result);
}
private Task<HttpResponseMessage> CreateSendTask()
{
// Only set the message content if the stream has been written to, otherwise
// HttpClient will complain if it's a GET request.
var messageContent = this.messageStream.ToArray();
if (messageContent.Length > 0)
{
this.requestMessage.Content = new ByteArrayContent(messageContent);
// Apply cached "Content" header values
foreach (var contentHeader in this.contentHeaderValueCache)
{
this.requestMessage.Content.Headers.Add(contentHeader.Key, contentHeader.Value);
}
}
this.requestMessage.Method = new HttpMethod(this.ActualMethod);
//return this.client.SendAsync(this.requestMessage);
return System.Threading.Tasks.Task.Run(() =>
pollyPolicy.Execute(() => {
Debug.WriteLine("Sending request - {0}", this.requestMessage.RequestUri);
// clone message to avoid 'message already sent' error
var headers = this.requestMessage.Headers;
this.requestMessage = new HttpRequestMessage
{
RequestUri = this.requestMessage.RequestUri,
Method = this.requestMessage.Method
};
foreach(var header in headers)
{
this.requestMessage.Headers.Add(header.Key, header.Value);
}
var task = this.client.SendAsync(this.requestMessage);
task.Wait();
if (task.Status == TaskStatus.Faulted)
{
throw task.Exception;
}
return task.Result;
})
);
}
private static IDictionary<string, string> HttpHeadersToStringDictionary(HttpHeaders headers)
{
return headers.ToDictionary((h1) => h1.Key, (h2) => string.Join(",", h2.Value));
}
private static HttpWebResponseMessage ConvertHttpClientResponse(HttpResponseMessage response)
{
var allHeaders = HttpHeadersToStringDictionary(response.Headers).Concat(HttpHeadersToStringDictionary(response.Content.Headers));
return new HttpWebResponseMessage(
allHeaders.ToDictionary((h1) => h1.Key, (h2) => h2.Value),
(int)response.StatusCode,
() =>
{
var task = response.Content.ReadAsStreamAsync();
task.Wait();
return task.Result;
});
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment