Created
November 8, 2023 07:28
-
-
Save jonnepmyra/ae40219ba6f7194f457fb24f3d68915f to your computer and use it in GitHub Desktop.
Amqp / Stream benchmark
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System.Net; | |
using BenchmarkDotNet.Attributes; | |
using BenchmarkDotNet.Engines; | |
using BenchmarkDotNet.Running; | |
using RabbitMQ.Client; | |
using RabbitMQ.Stream.Client; | |
BenchmarkRunner.Run<RabbitMQConsumer>(); | |
[SimpleJob(RunStrategy.Monitoring, launchCount: 1, warmupCount: 1, iterationCount: 100)] | |
public class RabbitMQConsumer | |
{ | |
private IConnection _connection; | |
private StreamSystem _streamSystem; | |
private string _username = "someuser"; | |
private string _password = "somepassword"; | |
private string _vhost = "somevhost"; | |
private string _streamName = "somestream"; | |
private ulong _offset = 321; | |
[GlobalCleanup] | |
public async Task Cleanup() | |
{ | |
try | |
{ | |
_connection.Close(); | |
await _streamSystem.Close(); | |
} | |
catch | |
{ | |
} | |
} | |
[GlobalSetup(Target = nameof(StreamProtocolConsumer))] | |
public async Task SetupStream() | |
{ | |
var streamConnectionConfig = new StreamSystemConfig | |
{ | |
UserName = _username, | |
Password = _password, | |
VirtualHost = _vhost, | |
Endpoints = new List<EndPoint>() { new DnsEndPoint("localhost", 5552) as EndPoint }, | |
ClientProvidedName = "streamconnection", | |
}; | |
_streamSystem = await | |
StreamSystem.Create(streamConnectionConfig); | |
} | |
[GlobalSetup(Target = nameof(AmqpStreamConsumer))] | |
public async Task SetupAmqp() | |
{ | |
var connectionFactory = new ConnectionFactory | |
{ | |
VirtualHost = _vhost, | |
UserName = _username, | |
Password = _password, | |
AutomaticRecoveryEnabled = true, | |
TopologyRecoveryEnabled = true, | |
DispatchConsumersAsync = true, | |
}; | |
_connection = connectionFactory.CreateConnection(new List<AmqpTcpEndpoint>() | |
{ | |
new("localhost", 6672) | |
}); | |
} | |
[Benchmark(Baseline = true)] | |
public async Task AmqpStreamConsumer() | |
{ | |
await WaitForFirstMessageUsingAmqp(false); | |
} | |
[Benchmark] | |
public async Task StreamProtocolConsumer() | |
{ | |
await WaitForFirstMessageUsingStreamProtocol(false); | |
} | |
private async Task WaitForFirstMessageUsingAmqp(bool closeConsumer) | |
{ | |
var model = _connection.CreateModel(); | |
model.BasicQos(0, 1, false); | |
var consumer = new MyConsumer(); | |
model.BasicConsume(_streamName, false, "myconsumer", false, false, | |
new Dictionary<string, object>() | |
{ | |
{ "x-stream-offset", (int)_offset } | |
}, consumer); | |
await consumer.Tcs.Task; | |
if (closeConsumer) | |
//run async and do not await since we | |
//do not want to benchmark close() | |
_ = Task.Run(async () => | |
{ | |
model.Close(); | |
}); | |
} | |
private async Task WaitForFirstMessageUsingStreamProtocol(bool closeConsumer) | |
{ | |
var tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); | |
var config = new RawConsumerConfig(_streamName) | |
{ | |
ClientProvidedName = "someclient", | |
Reference = Guid.NewGuid().ToString(), | |
OffsetSpec = new OffsetTypeOffset(_offset), | |
IsSingleActiveConsumer = false, | |
MessageHandler = async (consumer, context, arg3) => { tcs.TrySetResult(); } | |
}; | |
var consumer = await _streamSystem.CreateRawConsumer(config) | |
.ConfigureAwait(false); | |
await tcs.Task; | |
if (closeConsumer) | |
//run async and do not await since we | |
//do not want to benchmark close() | |
_ = Task.Run(async () => | |
{ | |
await consumer.Close(); | |
}); | |
} | |
private class MyConsumer : AsyncDefaultBasicConsumer | |
{ | |
public TaskCompletionSource Tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); | |
public override Task HandleBasicDeliver(string consumerTag, ulong deliveryTag, bool redelivered, | |
string exchange, string routingKey, | |
IBasicProperties properties, ReadOnlyMemory<byte> body) | |
{ | |
Tcs.TrySetResult(); | |
return Task.CompletedTask; | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment