Skip to content

Instantly share code, notes, and snippets.

@clemensv
Created November 15, 2021 13:15
Show Gist options
  • Save clemensv/ae2220ba97a74fab8d8aa210720e64e0 to your computer and use it in GitHub Desktop.
Save clemensv/ae2220ba97a74fab8d8aa210720e64e0 to your computer and use it in GitHub Desktop.
Very simple latency test for event hubs
using System;
namespace event_hubs_latency
{
using System.Collections.Concurrent;
using System.Diagnostics;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Azure.Messaging.EventHubs;
using Azure.Messaging.EventHubs.Consumer;
using Azure.Messaging.EventHubs.Processor;
using Azure.Messaging.EventHubs.Producer;
using Azure.Storage.Blobs;
using MathNet.Numerics.Statistics;
partial class Program
{
private const string cxs = "... EH connection string ....";
private const string stg = "... storage connection string ...";
private const string ctr = "storagecontainer";
private const string topic = "telemetry";
static int totalEvents = 3000;
static int prefetchCount = 0;
static int pendingEvents = totalEvents;
static void Main(string[] args)
{
Run().GetAwaiter().GetResult();
}
static async Task Run()
{
Console.ReadLine();
var now = DateTime.UtcNow;
Stopwatch sw = new Stopwatch();
sw.Start();
EventHubConnection ehci = new EventHubConnection(cxs, topic);
BlobContainerClient checkpointClient = new BlobContainerClient(stg, ctr);
var tracker = new ConcurrentDictionary<Guid, Tuple<long, long>>();
var mre = new ManualResetEvent(false);
EventProcessorClientOptions clientOptions = new EventProcessorClientOptions() {
PrefetchCount = prefetchCount,
LoadBalancingStrategy = LoadBalancingStrategy.Greedy
};
var ehp = new EventProcessorClient(checkpointClient, "$Default", cxs, "telemetry", clientOptions);
ehp.PartitionInitializingAsync += async (arg) =>
{
arg.DefaultStartingPosition = EventPosition.FromEnqueuedTime(now);
};
ehp.ProcessEventAsync += async (args) =>
{
if (!args.HasEvent)
return;
var eventId = new Guid(args.Data.CorrelationId);
if (tracker.ContainsKey(eventId))
{
tracker[eventId] = new Tuple<long, long>(tracker[eventId].Item1, sw.ElapsedTicks);
_ = Console.Out.WriteLineAsync($"{pendingEvents}, {args.Partition.PartitionId} ev {eventId}, start: {tracker[eventId].Item1}, stop: {sw.ElapsedTicks}, duration: {(sw.ElapsedTicks - tracker[eventId].Item1) / 10000.0} msec");
if (Interlocked.Decrement(ref pendingEvents) == 0)
{
mre.Set();
}
}
};
ehp.ProcessErrorAsync += async args => { };
ehp.StartProcessing();
var ehpc = new EventHubProducerClient(ehci);
var payload = new byte[1024];
for (int i = -25; i < totalEvents; i++)
{
var eventid = Guid.NewGuid();
if (i >= 0)
{
tracker.TryAdd(eventid, new Tuple<long, long>(sw.ElapsedTicks, -1));
}
var evd = new EventData(payload);
evd.CorrelationId = eventid.ToString();
await ehpc.SendAsync(new[] { evd });
}
mre.WaitOne();
ehp.StopProcessing();
_ = Console.Out.WriteLineAsync($"total {tracker.Count}, mean: {Statistics.Mean(from kv in tracker select ((kv.Value.Item2 - kv.Value.Item1) / 10000.0))} msec, median: {Statistics.Median(from kv in tracker select ((kv.Value.Item2 - kv.Value.Item1) / 10000.0))}, P99: {Statistics.Quantile(from kv in tracker select ((kv.Value.Item2 - kv.Value.Item1) / 10000.0), 0.99)}, P99.9: {Statistics.Quantile(from kv in tracker select ((kv.Value.Item2 - kv.Value.Item1) / 10000.0), 0.999)}");
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment