Skip to content

Instantly share code, notes, and snippets.

@antonfirsov
Last active July 12, 2023 18:04
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save antonfirsov/6bcd1e2d099eef52a077dd0fab862bfb to your computer and use it in GitHub Desktop.
Save antonfirsov/6bcd1e2d099eef52a077dd0fab862bfb to your computer and use it in GitHub Desktop.
#define METRICS
using System.Diagnostics.Metrics;
using BenchmarkDotNet.Attributes;
using System.Text;
using System.Diagnostics;
using System.Threading.Tasks.Sources;
using System.Net;
#if METRICS
using System.Net.Http.Metrics;
#endif
namespace EnrichmentBenchmarks
{
[MemoryDiagnoser]
public abstract class EnrichmentBenchmarkBase
{
protected HttpClient? _httpClient;
private readonly MeterListener _meterListener = new MeterListener();
protected static readonly Uri RequestUri = new Uri("http://test/");
protected virtual int GetConcurrency() => 1;
static EnrichmentBenchmarkBase()
{
Console.WriteLine($"RUNTIME LOC: {typeof(object).Assembly.Location}");
}
protected void EnableMetrics()
{
_meterListener.InstrumentPublished = (instrument, listener) =>
{
if (instrument.Meter.Name == "System.Net.Http")
{
Console.WriteLine($"ENABLED {instrument.Name}");
listener.EnableMeasurementEvents(instrument);
}
};
_meterListener.Start();
}
protected SocketsHttpHandler CreateHandler()
{
CountdownEvent connectCallbackLock = new CountdownEvent(GetConcurrency());
return new SocketsHttpHandler
{
UseProxy = false,
AllowAutoRedirect = false,
AutomaticDecompression = DecompressionMethods.None,
UseCookies = false,
ActivityHeadersPropagator = null,
PooledConnectionIdleTimeout = TimeSpan.FromDays(10), // Avoid the cleaning timer executing during the benchmark
ConnectCallback = (context, cancellation) =>
{
connectCallbackLock.Signal();
connectCallbackLock.Wait(cancellation);
return new ValueTask<Stream>(new ResponseStream());
}
};
}
protected Task SendRequestAsync() => _httpClient!.SendAsync(new HttpRequestMessage(HttpMethod.Get, RequestUri)
{
Version = HttpVersion.Version11,
VersionPolicy = HttpVersionPolicy.RequestVersionExact,
});
[GlobalSetup(Target = "NoMetrics")]
public void SetupNoMetrics()
{
Console.WriteLine("-- SetupNoMetrics --");
_httpClient = new HttpClient(CreateHandler());
}
#if METRICS
[GlobalSetup(Target = "WithMetrics")]
public void SetupMetrics()
{
Console.WriteLine("-- SetupMetrics --");
_httpClient = new HttpClient(CreateHandler());
EnableMetrics();
}
[GlobalSetup(Target = "WithMetricsAndEnrichment")]
public void SetupMetricsWithEnrichment()
{
Console.WriteLine("-- SetupMetricsWithEnrichment --");
_httpClient = new HttpClient(new EnrichmentHandler(CreateHandler()));
EnableMetrics();
}
private sealed class EnrichmentHandler : DelegatingHandler
{
public EnrichmentHandler(HttpMessageHandler innerHandler) : base(innerHandler)
{
}
protected override HttpResponseMessage Send(HttpRequestMessage request, CancellationToken cancellationToken)
{
HttpMetricsEnrichmentContext.AddCallback(request, Enrich);
return base.Send(request, cancellationToken);
}
protected override Task<HttpResponseMessage> SendAsync(HttpRequestMessage request, CancellationToken cancellationToken)
{
HttpMetricsEnrichmentContext.AddCallback(request, Enrich);
return base.SendAsync(request, cancellationToken);
}
private static void Enrich(HttpMetricsEnrichmentContext context)
{
context.AddCustomTag("x", "y");
}
}
#endif
}
public class EnrichmentBenchmark_SingleRequest : EnrichmentBenchmarkBase
{
[Benchmark]
public Task NoMetrics() => SendRequestAsync();
#if METRICS
[Benchmark]
public Task WithMetrics() => SendRequestAsync();
[Benchmark]
public Task WithMetricsAndEnrichment() => SendRequestAsync();
#endif
}
public class EnrichmentBenchmark_ParallelRequests : EnrichmentBenchmarkBase
{
[Params(20)]
public int Concurrency { get; set; }
protected override int GetConcurrency() => Concurrency;
private Task SendParallelRequestsAsync()
{
Task[] tasks = new Task[Concurrency];
for (int i = 0; i < Concurrency; i++)
{
tasks[i] = Task.Run(SendRequestAsync);
}
return Task.WhenAll(tasks);
}
[Benchmark]
public Task NoMetrics() => SendParallelRequestsAsync();
#if METRICS
[Benchmark]
public Task WithMetrics() => SendParallelRequestsAsync();
[Benchmark]
public Task WithMetricsAndEnrichment() => SendParallelRequestsAsync();
#endif
}
}
public sealed class ResponseStream : Stream, IValueTaskSource<int>
{
private ManualResetValueTaskSourceCore<int> _waitSource = new() { RunContinuationsAsynchronously = true };
private bool _writeCompleted;
private bool _readStarted;
private readonly byte[] _responseData;
private readonly bool _forceAsyncYield;
public ResponseStream(bool forceAsyncYield = true)
: this(DefaultResponse, forceAsyncYield)
{
}
public ResponseStream(byte[] responseData, bool forceAsyncYield = true)
{
_responseData = responseData;
_forceAsyncYield = forceAsyncYield;
}
public override ValueTask<int> ReadAsync(Memory<byte> buffer, CancellationToken cancellationToken = default)
{
return _forceAsyncYield
? ReadAsyncSlow(buffer, cancellationToken)
: ReadAsyncCore(buffer, cancellationToken);
}
private async ValueTask<int> ReadAsyncSlow(Memory<byte> buffer, CancellationToken cancellationToken)
{
await Task.Yield();
return await ReadAsyncCore(buffer, cancellationToken);
}
private ValueTask<int> ReadAsyncCore(Memory<byte> buffer, CancellationToken cancellationToken)
{
_responseData.CopyTo(buffer.Span);
lock (this)
{
if (_writeCompleted)
{
_writeCompleted = false;
return new ValueTask<int>(_responseData.Length);
}
else
{
_readStarted = true;
_waitSource.Reset();
return new ValueTask<int>(this, _waitSource.Version);
}
}
}
public override int Read(Span<byte> buffer)
{
Debug.Assert(_writeCompleted);
_writeCompleted = false;
_responseData.CopyTo(buffer);
return _responseData.Length;
}
public override ValueTask WriteAsync(ReadOnlyMemory<byte> buffer, CancellationToken cancellationToken = default)
{
lock (this)
{
if (_readStarted)
{
_readStarted = false;
_waitSource.SetResult(_responseData.Length);
}
else
{
_writeCompleted = true;
}
}
return default;
}
public override void Write(ReadOnlySpan<byte> buffer)
{
Debug.Assert(!_readStarted);
_writeCompleted = true;
}
public int GetResult(short token) =>
_waitSource.GetResult(token);
public ValueTaskSourceStatus GetStatus(short token) =>
_waitSource.GetStatus(token);
public void OnCompleted(Action<object?> continuation, object? state, short token, ValueTaskSourceOnCompletedFlags flags) =>
_waitSource.OnCompleted(continuation, state, token, flags);
public override bool CanRead => true;
public override bool CanSeek => false;
public override bool CanWrite => true;
public override Task FlushAsync(CancellationToken cancellationToken) => Task.CompletedTask;
public override void Flush() => throw new InvalidOperationException();
public override int Read(byte[] buffer, int offset, int count) => throw new InvalidOperationException();
public override long Seek(long offset, SeekOrigin origin) => throw new InvalidOperationException();
public override void SetLength(long value) => throw new InvalidOperationException();
public override void Write(byte[] buffer, int offset, int count) => throw new InvalidOperationException();
public override long Length => throw new InvalidOperationException();
public override long Position { get => throw new InvalidOperationException(); set => throw new InvalidOperationException(); }
private static readonly byte[] DefaultResponse = Encoding.UTF8.GetBytes(@"HTTP/1.1 200 OK
Content-Type: text/plain; charset=UTF-8
Content-Length: 4
Connection: Keep-Alive
test");
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment