Created
May 19, 2023 04:04
-
-
Save JamesNK/9cd342b3a547bc2f7963d8de5c18385c to your computer and use it in GitHub Desktop.
Detect HttpClient response end
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System.Diagnostics; | |
using System.Net; | |
var stopwatch = Stopwatch.StartNew(); | |
var endCallback = () => Console.WriteLine($"{stopwatch.ElapsedTicks} - Response ended"); | |
var httpClient = new HttpMessageInvoker(new DetectEndRequestHandler(new SocketsHttpHandler(), endCallback)); | |
Console.WriteLine($"{stopwatch.ElapsedTicks} - Send request"); | |
var response = await httpClient.SendAsync(new HttpRequestMessage(HttpMethod.Get, "https://www.google.com"), CancellationToken.None); | |
Console.WriteLine($"{stopwatch.ElapsedTicks} - Response headers received"); | |
if (response.Content != null) | |
{ | |
Console.WriteLine($"{stopwatch.ElapsedTicks} - Read response body"); | |
var content = await response.Content.ReadAsStreamAsync(); | |
await content.CopyToAsync(new MemoryStream()); | |
} | |
Console.WriteLine($"{stopwatch.ElapsedTicks} - App exiting"); | |
class DetectEndRequestHandler : DelegatingHandler | |
{ | |
private readonly Action _endCallback; | |
public DetectEndRequestHandler(HttpMessageHandler handler, Action endCallback) : base(handler) | |
{ | |
_endCallback = endCallback; | |
} | |
protected override async Task<HttpResponseMessage> SendAsync(HttpRequestMessage request, CancellationToken cancellationToken) | |
{ | |
var response = await base.SendAsync(request, cancellationToken); | |
if (response.Content is not null) | |
{ | |
response.Content = new DetectEndContent(response.Content, _endCallback); | |
} | |
else | |
{ | |
_endCallback(); | |
} | |
return response; | |
} | |
private class DetectEndContent : HttpContent | |
{ | |
private readonly HttpContent _inner; | |
private readonly Action _endCallback; | |
private Stream? _innerStream; | |
public DetectEndContent(HttpContent inner, Action endCallback) | |
{ | |
_inner = inner; | |
_endCallback = endCallback; | |
foreach (var header in inner.Headers) | |
{ | |
Headers.TryAddWithoutValidation(header.Key, header.Value); | |
} | |
} | |
protected override async Task SerializeToStreamAsync(Stream stream, TransportContext? context) | |
{ | |
// This method will only be called by tests when response content is | |
// accessed via ReadAsBytesAsync. The gRPC client will always | |
// call ReadAsStreamAsync, which will call CreateContentReadStreamAsync. | |
_innerStream = await _inner.ReadAsStreamAsync().ConfigureAwait(false); | |
await _innerStream.CopyToAsync(stream).ConfigureAwait(false); | |
_endCallback(); | |
} | |
protected override async Task<Stream> CreateContentReadStreamAsync() | |
{ | |
var stream = await _inner.ReadAsStreamAsync().ConfigureAwait(false); | |
return new StreamWrapper(stream, _endCallback); | |
} | |
protected override bool TryComputeLength(out long length) | |
{ | |
length = -1; | |
return false; | |
} | |
protected override void Dispose(bool disposing) | |
{ | |
if (disposing) | |
{ | |
// This is important. Disposing original response content will cancel the gRPC call. | |
_inner.Dispose(); | |
_innerStream?.Dispose(); | |
} | |
base.Dispose(disposing); | |
} | |
} | |
internal sealed class StreamWrapper : Stream | |
{ | |
private readonly Stream _inner; | |
private readonly Action _endCallback; | |
public StreamWrapper(Stream inner, Action endCallback) | |
{ | |
_inner = inner; | |
_endCallback = endCallback; | |
} | |
public override bool CanRead => _inner.CanRead; | |
public override bool CanSeek => _inner.CanSeek; | |
public override bool CanWrite => _inner.CanWrite; | |
public override long Length => _inner.Length; | |
public override long Position | |
{ | |
get => _inner.Position; | |
set => _inner.Position = value; | |
} | |
public override int ReadTimeout | |
{ | |
get => _inner.ReadTimeout; | |
set => _inner.ReadTimeout = value; | |
} | |
public override int WriteTimeout | |
{ | |
get => _inner.WriteTimeout; | |
set => _inner.WriteTimeout = value; | |
} | |
public override void Flush() => _inner.Flush(); | |
public override int Read(byte[] buffer, int offset, int count) => _inner.Read(buffer, offset, count); | |
public override long Seek(long offset, SeekOrigin origin) => _inner.Seek(offset, origin); | |
public override void SetLength(long value) => _inner.SetLength(value); | |
public override void Write(byte[] buffer, int offset, int count) => _inner.Write(buffer, offset, count); | |
public override Task FlushAsync(CancellationToken cancellationToken) => _inner.FlushAsync(cancellationToken); | |
public override Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) => | |
_inner.WriteAsync(buffer, offset, count, cancellationToken); | |
public override ValueTask WriteAsync(ReadOnlyMemory<byte> buffer, CancellationToken cancellationToken = default) => | |
_inner.WriteAsync(buffer, cancellationToken); | |
public override async Task<int> ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) | |
{ | |
var result = await _inner.ReadAsync(buffer, offset, count, cancellationToken); | |
// TODO: Handle zero-length reads. Avoid multiple calls. | |
if (result == 0) | |
{ | |
_endCallback(); | |
} | |
return result; | |
} | |
public override async ValueTask<int> ReadAsync(Memory<byte> buffer, CancellationToken cancellationToken = default) | |
{ | |
var result = await _inner.ReadAsync(buffer, cancellationToken); | |
// TODO: Handle zero-length reads. Avoid multiple calls. | |
if (result == 0) | |
{ | |
_endCallback(); | |
} | |
return result; | |
} | |
public override async Task CopyToAsync(Stream destination, int bufferSize, CancellationToken cancellationToken) | |
{ | |
await _inner.CopyToAsync(destination, bufferSize, cancellationToken); | |
// Avoid multiple calls. | |
_endCallback(); | |
} | |
public override async ValueTask DisposeAsync() | |
{ | |
await base.DisposeAsync().ConfigureAwait(false); | |
await _inner.DisposeAsync().ConfigureAwait(false); | |
} | |
protected override void Dispose(bool disposing) | |
{ | |
base.Dispose(disposing); | |
if (disposing) | |
{ | |
_inner.Dispose(); | |
} | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Output: