Skip to content

Instantly share code, notes, and snippets.

@JamesNK
Created May 19, 2023 04:04
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save JamesNK/9cd342b3a547bc2f7963d8de5c18385c to your computer and use it in GitHub Desktop.
Save JamesNK/9cd342b3a547bc2f7963d8de5c18385c to your computer and use it in GitHub Desktop.
Detect HttpClient response end
using System.Diagnostics;
using System.Net;
var stopwatch = Stopwatch.StartNew();
var endCallback = () => Console.WriteLine($"{stopwatch.ElapsedTicks} - Response ended");
var httpClient = new HttpMessageInvoker(new DetectEndRequestHandler(new SocketsHttpHandler(), endCallback));
Console.WriteLine($"{stopwatch.ElapsedTicks} - Send request");
var response = await httpClient.SendAsync(new HttpRequestMessage(HttpMethod.Get, "https://www.google.com"), CancellationToken.None);
Console.WriteLine($"{stopwatch.ElapsedTicks} - Response headers received");
if (response.Content != null)
{
Console.WriteLine($"{stopwatch.ElapsedTicks} - Read response body");
var content = await response.Content.ReadAsStreamAsync();
await content.CopyToAsync(new MemoryStream());
}
Console.WriteLine($"{stopwatch.ElapsedTicks} - App exiting");
class DetectEndRequestHandler : DelegatingHandler
{
private readonly Action _endCallback;
public DetectEndRequestHandler(HttpMessageHandler handler, Action endCallback) : base(handler)
{
_endCallback = endCallback;
}
protected override async Task<HttpResponseMessage> SendAsync(HttpRequestMessage request, CancellationToken cancellationToken)
{
var response = await base.SendAsync(request, cancellationToken);
if (response.Content is not null)
{
response.Content = new DetectEndContent(response.Content, _endCallback);
}
else
{
_endCallback();
}
return response;
}
private class DetectEndContent : HttpContent
{
private readonly HttpContent _inner;
private readonly Action _endCallback;
private Stream? _innerStream;
public DetectEndContent(HttpContent inner, Action endCallback)
{
_inner = inner;
_endCallback = endCallback;
foreach (var header in inner.Headers)
{
Headers.TryAddWithoutValidation(header.Key, header.Value);
}
}
protected override async Task SerializeToStreamAsync(Stream stream, TransportContext? context)
{
// This method will only be called by tests when response content is
// accessed via ReadAsBytesAsync. The gRPC client will always
// call ReadAsStreamAsync, which will call CreateContentReadStreamAsync.
_innerStream = await _inner.ReadAsStreamAsync().ConfigureAwait(false);
await _innerStream.CopyToAsync(stream).ConfigureAwait(false);
_endCallback();
}
protected override async Task<Stream> CreateContentReadStreamAsync()
{
var stream = await _inner.ReadAsStreamAsync().ConfigureAwait(false);
return new StreamWrapper(stream, _endCallback);
}
protected override bool TryComputeLength(out long length)
{
length = -1;
return false;
}
protected override void Dispose(bool disposing)
{
if (disposing)
{
// This is important. Disposing original response content will cancel the gRPC call.
_inner.Dispose();
_innerStream?.Dispose();
}
base.Dispose(disposing);
}
}
internal sealed class StreamWrapper : Stream
{
private readonly Stream _inner;
private readonly Action _endCallback;
public StreamWrapper(Stream inner, Action endCallback)
{
_inner = inner;
_endCallback = endCallback;
}
public override bool CanRead => _inner.CanRead;
public override bool CanSeek => _inner.CanSeek;
public override bool CanWrite => _inner.CanWrite;
public override long Length => _inner.Length;
public override long Position
{
get => _inner.Position;
set => _inner.Position = value;
}
public override int ReadTimeout
{
get => _inner.ReadTimeout;
set => _inner.ReadTimeout = value;
}
public override int WriteTimeout
{
get => _inner.WriteTimeout;
set => _inner.WriteTimeout = value;
}
public override void Flush() => _inner.Flush();
public override int Read(byte[] buffer, int offset, int count) => _inner.Read(buffer, offset, count);
public override long Seek(long offset, SeekOrigin origin) => _inner.Seek(offset, origin);
public override void SetLength(long value) => _inner.SetLength(value);
public override void Write(byte[] buffer, int offset, int count) => _inner.Write(buffer, offset, count);
public override Task FlushAsync(CancellationToken cancellationToken) => _inner.FlushAsync(cancellationToken);
public override Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) =>
_inner.WriteAsync(buffer, offset, count, cancellationToken);
public override ValueTask WriteAsync(ReadOnlyMemory<byte> buffer, CancellationToken cancellationToken = default) =>
_inner.WriteAsync(buffer, cancellationToken);
public override async Task<int> ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
{
var result = await _inner.ReadAsync(buffer, offset, count, cancellationToken);
// TODO: Handle zero-length reads. Avoid multiple calls.
if (result == 0)
{
_endCallback();
}
return result;
}
public override async ValueTask<int> ReadAsync(Memory<byte> buffer, CancellationToken cancellationToken = default)
{
var result = await _inner.ReadAsync(buffer, cancellationToken);
// TODO: Handle zero-length reads. Avoid multiple calls.
if (result == 0)
{
_endCallback();
}
return result;
}
public override async Task CopyToAsync(Stream destination, int bufferSize, CancellationToken cancellationToken)
{
await _inner.CopyToAsync(destination, bufferSize, cancellationToken);
// Avoid multiple calls.
_endCallback();
}
public override async ValueTask DisposeAsync()
{
await base.DisposeAsync().ConfigureAwait(false);
await _inner.DisposeAsync().ConfigureAwait(false);
}
protected override void Dispose(bool disposing)
{
base.Dispose(disposing);
if (disposing)
{
_inner.Dispose();
}
}
}
}
@JamesNK
Copy link
Author

JamesNK commented May 19, 2023

Output:

148507 - Send request
4805160 - Response headers received
4806028 - Read response body
4839592 - Response ended
4840081 - App exiting

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment