Created
November 15, 2021 13:15
-
-
Save clemensv/ae2220ba97a74fab8d8aa210720e64e0 to your computer and use it in GitHub Desktop.
Very simple latency test for event hubs
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System; | |
namespace event_hubs_latency | |
{ | |
using System.Collections.Concurrent; | |
using System.Diagnostics; | |
using System.Linq; | |
using System.Threading; | |
using System.Threading.Tasks; | |
using Azure.Messaging.EventHubs; | |
using Azure.Messaging.EventHubs.Consumer; | |
using Azure.Messaging.EventHubs.Processor; | |
using Azure.Messaging.EventHubs.Producer; | |
using Azure.Storage.Blobs; | |
using MathNet.Numerics.Statistics; | |
partial class Program | |
{ | |
private const string cxs = "... EH connection string ...."; | |
private const string stg = "... storage connection string ..."; | |
private const string ctr = "storagecontainer"; | |
private const string topic = "telemetry"; | |
static int totalEvents = 3000; | |
static int prefetchCount = 0; | |
static int pendingEvents = totalEvents; | |
static void Main(string[] args) | |
{ | |
Run().GetAwaiter().GetResult(); | |
} | |
static async Task Run() | |
{ | |
Console.ReadLine(); | |
var now = DateTime.UtcNow; | |
Stopwatch sw = new Stopwatch(); | |
sw.Start(); | |
EventHubConnection ehci = new EventHubConnection(cxs, topic); | |
BlobContainerClient checkpointClient = new BlobContainerClient(stg, ctr); | |
var tracker = new ConcurrentDictionary<Guid, Tuple<long, long>>(); | |
var mre = new ManualResetEvent(false); | |
EventProcessorClientOptions clientOptions = new EventProcessorClientOptions() { | |
PrefetchCount = prefetchCount, | |
LoadBalancingStrategy = LoadBalancingStrategy.Greedy | |
}; | |
var ehp = new EventProcessorClient(checkpointClient, "$Default", cxs, "telemetry", clientOptions); | |
ehp.PartitionInitializingAsync += async (arg) => | |
{ | |
arg.DefaultStartingPosition = EventPosition.FromEnqueuedTime(now); | |
}; | |
ehp.ProcessEventAsync += async (args) => | |
{ | |
if (!args.HasEvent) | |
return; | |
var eventId = new Guid(args.Data.CorrelationId); | |
if (tracker.ContainsKey(eventId)) | |
{ | |
tracker[eventId] = new Tuple<long, long>(tracker[eventId].Item1, sw.ElapsedTicks); | |
_ = Console.Out.WriteLineAsync($"{pendingEvents}, {args.Partition.PartitionId} ev {eventId}, start: {tracker[eventId].Item1}, stop: {sw.ElapsedTicks}, duration: {(sw.ElapsedTicks - tracker[eventId].Item1) / 10000.0} msec"); | |
if (Interlocked.Decrement(ref pendingEvents) == 0) | |
{ | |
mre.Set(); | |
} | |
} | |
}; | |
ehp.ProcessErrorAsync += async args => { }; | |
ehp.StartProcessing(); | |
var ehpc = new EventHubProducerClient(ehci); | |
var payload = new byte[1024]; | |
for (int i = -25; i < totalEvents; i++) | |
{ | |
var eventid = Guid.NewGuid(); | |
if (i >= 0) | |
{ | |
tracker.TryAdd(eventid, new Tuple<long, long>(sw.ElapsedTicks, -1)); | |
} | |
var evd = new EventData(payload); | |
evd.CorrelationId = eventid.ToString(); | |
await ehpc.SendAsync(new[] { evd }); | |
} | |
mre.WaitOne(); | |
ehp.StopProcessing(); | |
_ = Console.Out.WriteLineAsync($"total {tracker.Count}, mean: {Statistics.Mean(from kv in tracker select ((kv.Value.Item2 - kv.Value.Item1) / 10000.0))} msec, median: {Statistics.Median(from kv in tracker select ((kv.Value.Item2 - kv.Value.Item1) / 10000.0))}, P99: {Statistics.Quantile(from kv in tracker select ((kv.Value.Item2 - kv.Value.Item1) / 10000.0), 0.99)}, P99.9: {Statistics.Quantile(from kv in tracker select ((kv.Value.Item2 - kv.Value.Item1) / 10000.0), 0.999)}"); | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment