Skip to content

Instantly share code, notes, and snippets.

@jonnepmyra
Created November 8, 2023 07:28
Show Gist options
  • Save jonnepmyra/ae40219ba6f7194f457fb24f3d68915f to your computer and use it in GitHub Desktop.
Save jonnepmyra/ae40219ba6f7194f457fb24f3d68915f to your computer and use it in GitHub Desktop.
Amqp / Stream benchmark
using System.Net;
using BenchmarkDotNet.Attributes;
using BenchmarkDotNet.Engines;
using BenchmarkDotNet.Running;
using RabbitMQ.Client;
using RabbitMQ.Stream.Client;
BenchmarkRunner.Run<RabbitMQConsumer>();
[SimpleJob(RunStrategy.Monitoring, launchCount: 1, warmupCount: 1, iterationCount: 100)]
public class RabbitMQConsumer
{
private IConnection _connection;
private StreamSystem _streamSystem;
private string _username = "someuser";
private string _password = "somepassword";
private string _vhost = "somevhost";
private string _streamName = "somestream";
private ulong _offset = 321;
[GlobalCleanup]
public async Task Cleanup()
{
try
{
_connection.Close();
await _streamSystem.Close();
}
catch
{
}
}
[GlobalSetup(Target = nameof(StreamProtocolConsumer))]
public async Task SetupStream()
{
var streamConnectionConfig = new StreamSystemConfig
{
UserName = _username,
Password = _password,
VirtualHost = _vhost,
Endpoints = new List<EndPoint>() { new DnsEndPoint("localhost", 5552) as EndPoint },
ClientProvidedName = "streamconnection",
};
_streamSystem = await
StreamSystem.Create(streamConnectionConfig);
}
[GlobalSetup(Target = nameof(AmqpStreamConsumer))]
public async Task SetupAmqp()
{
var connectionFactory = new ConnectionFactory
{
VirtualHost = _vhost,
UserName = _username,
Password = _password,
AutomaticRecoveryEnabled = true,
TopologyRecoveryEnabled = true,
DispatchConsumersAsync = true,
};
_connection = connectionFactory.CreateConnection(new List<AmqpTcpEndpoint>()
{
new("localhost", 6672)
});
}
[Benchmark(Baseline = true)]
public async Task AmqpStreamConsumer()
{
await WaitForFirstMessageUsingAmqp(false);
}
[Benchmark]
public async Task StreamProtocolConsumer()
{
await WaitForFirstMessageUsingStreamProtocol(false);
}
private async Task WaitForFirstMessageUsingAmqp(bool closeConsumer)
{
var model = _connection.CreateModel();
model.BasicQos(0, 1, false);
var consumer = new MyConsumer();
model.BasicConsume(_streamName, false, "myconsumer", false, false,
new Dictionary<string, object>()
{
{ "x-stream-offset", (int)_offset }
}, consumer);
await consumer.Tcs.Task;
if (closeConsumer)
//run async and do not await since we
//do not want to benchmark close()
_ = Task.Run(async () =>
{
model.Close();
});
}
private async Task WaitForFirstMessageUsingStreamProtocol(bool closeConsumer)
{
var tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously);
var config = new RawConsumerConfig(_streamName)
{
ClientProvidedName = "someclient",
Reference = Guid.NewGuid().ToString(),
OffsetSpec = new OffsetTypeOffset(_offset),
IsSingleActiveConsumer = false,
MessageHandler = async (consumer, context, arg3) => { tcs.TrySetResult(); }
};
var consumer = await _streamSystem.CreateRawConsumer(config)
.ConfigureAwait(false);
await tcs.Task;
if (closeConsumer)
//run async and do not await since we
//do not want to benchmark close()
_ = Task.Run(async () =>
{
await consumer.Close();
});
}
private class MyConsumer : AsyncDefaultBasicConsumer
{
public TaskCompletionSource Tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously);
public override Task HandleBasicDeliver(string consumerTag, ulong deliveryTag, bool redelivered,
string exchange, string routingKey,
IBasicProperties properties, ReadOnlyMemory<byte> body)
{
Tcs.TrySetResult();
return Task.CompletedTask;
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment