Skip to content

Instantly share code, notes, and snippets.

@dili91
Last active January 12, 2024 15:10
Show Gist options
  • Save dili91/343948a5e8289b617ff8c855b3a3f8d1 to your computer and use it in GitHub Desktop.
Save dili91/343948a5e8289b617ff8c855b3a3f8d1 to your computer and use it in GitHub Desktop.
A test for concurrency behaviour on EasyNetQ while consuming messages
using EasyNetQ;
using EasyNetQ.Consumer;
using EasyNetQ.Topology;
using Xunit.Abstractions;
namespace easynetq_concurrency_tests;
using System;
using System.Collections.Concurrent;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using FluentAssertions;
using Xunit;
using System.Text.Json;
public class ConcurrencyTests
{
private static readonly TimeSpan BlockingTaskDuration = TimeSpan.FromMilliseconds(100);
private readonly ITestOutputHelper _testOutputHelper;
private readonly ConcurrentQueue<EventTrack> _eventHistory = new();
public ConcurrencyTests(ITestOutputHelper testOutputHelper)
{
_testOutputHelper = testOutputHelper;
}
[Theory]
[MemberData(nameof(ConcurrentConsumptionTestData))]
public async Task EasyNetQ_ConsumeAsync_Should_Consume_Concurrently(
ushort prefetchCount,
ushort consumerDispatchConcurrency,
ushort noOfMessages)
{
var connectionConfig = new ConnectionConfiguration
{
Hosts = new List<HostConfiguration>() { new() { Host = "localhost", Port = 2765 } },
VirtualHost = "/",
ConsumerDispatcherConcurrency = consumerDispatchConcurrency,
PrefetchCount = prefetchCount,
UserName = "guest",
Password = "guest",
};
// Arrange
var bus = RabbitHutch.CreateBus(_ => connectionConfig,
register => register.RegisterDefaultServices(_ => connectionConfig));
var exchange = await bus.Advanced.ExchangeDeclareAsync("myExchange", ExchangeType.Topic);
var queue = await bus.Advanced.QueueDeclareAsync("myQueue", durable: true, exclusive: false, autoDelete: false);
await bus.Advanced.BindAsync(exchange, queue, "*");
var consumer = bus.Advanced.Consume<TestEvent>(queue, (m, _) =>
new TestMessageHandler(_testOutputHelper, _eventHistory).Handle(m.Body));
var messages = Enumerable.Range(0, noOfMessages).Select(i => new TestEvent
{
Id = (ushort)i,
}).ToList();
foreach (var m in messages)
{
await bus.Advanced.PublishAsync(exchange, "*", true, new Message<TestEvent>(m));
}
// Assert
// wait until all messages are consumed
SpinWait.SpinUntil(() => _eventHistory.Count.Equals(2 * messages.Count),
BlockingTaskDuration.Multiply(noOfMessages).Multiply(2));
// Assert that all messages were consumed concurrently
_eventHistory.Count.Should().Be(2 * messages.Count);
_eventHistory.Count(x => x.Step.Equals(EventTrack.ConsumingStep.Start)).Should().Be(messages.Count);
var eventHistorySequential = _eventHistory
.OrderBy(e => e.Event.Id)
.ThenBy(e => e.Step);
_eventHistory.Should().NotBeEquivalentTo(eventHistorySequential, x => x.WithStrictOrdering());
consumer.Dispose(); //explicit disposable is required to stop stealing messages from subsequent test runs
}
[Theory]
[MemberData(nameof(ConcurrentConsumptionTestData))]
public async Task EasyNetQ_ConsumeAsync_Raw_Should_Consume_Concurrently(
ushort prefetchCount,
ushort consumerDispatchConcurrency,
ushort noOfMessages)
{
var connectionConfig = new ConnectionConfiguration
{
Hosts = new List<HostConfiguration>() { new() { Host = "localhost", Port = 2765 } },
VirtualHost = "/",
ConsumerDispatcherConcurrency = consumerDispatchConcurrency,
PrefetchCount = prefetchCount,
UserName = "guest",
Password = "guest",
};
// Arrange
var bus = RabbitHutch.CreateBus(_ => connectionConfig,
register => register.RegisterDefaultServices(_ => connectionConfig));
var exchange = await bus.Advanced.ExchangeDeclareAsync("myExchange", ExchangeType.Topic);
var queue = await bus.Advanced.QueueDeclareAsync("myQueue", durable: true, exclusive: false, autoDelete: false);
await bus.Advanced.BindAsync(exchange, queue, "*");
var consumer = bus.Advanced.Consume(queue, (body, property, info) =>
{
var m = JsonSerializer.Deserialize<TestEvent>(body.Span)!;
new TestMessageHandler(_testOutputHelper, _eventHistory).Handle(m);
});
var messages = Enumerable.Range(0, noOfMessages).Select(i => new TestEvent
{
Id = (ushort)i,
}).ToList();
foreach (var m in messages)
{
var body = new ReadOnlyMemory<byte>(JsonSerializer.SerializeToUtf8Bytes(m));
await bus.Advanced.PublishAsync(exchange, "*", true,
properties: new MessageProperties(),
body);
}
// Assert
// wait until all messages are consumed
SpinWait.SpinUntil(() => _eventHistory.Count.Equals(2 * messages.Count),
BlockingTaskDuration.Multiply(noOfMessages).Multiply(2));
// Assert that all messages were consumed concurrently
_eventHistory.Count.Should().Be(2 * messages.Count);
_eventHistory.Count(x => x.Step.Equals(EventTrack.ConsumingStep.Start)).Should().Be(messages.Count);
var eventHistorySequential = _eventHistory
.OrderBy(e => e.Event.Id)
.ThenBy(e => e.Step);
_eventHistory.Should().NotBeEquivalentTo(eventHistorySequential, x => x.WithStrictOrdering());
consumer.Dispose(); //explicit disposable is required to stop stealing messages from subsequent test runs
}
[Theory]
[MemberData(nameof(ConcurrentConsumptionTestData))]
public async Task EasyNetQ_ConsumeAsync_Raw_With_Delegate_Should_Consume_Sequentially_Even_If_PrefetchCountAndConsumerDispatchAllowConcurrency(
ushort prefetchCount,
ushort consumerDispatchConcurrency,
ushort noOfMessages)
{
var connectionConfig = new ConnectionConfiguration
{
Hosts = new List<HostConfiguration>() { new() { Host = "localhost", Port = 2765 } },
VirtualHost = "/",
ConsumerDispatcherConcurrency = consumerDispatchConcurrency,
PrefetchCount = prefetchCount,
UserName = "guest",
Password = "guest",
};
// Arrange
var bus = RabbitHutch.CreateBus(_ => connectionConfig,
register => register.RegisterDefaultServices(_ => connectionConfig));
var exchange = await bus.Advanced.ExchangeDeclareAsync("myExchange", ExchangeType.Topic);
var queue = await bus.Advanced.QueueDeclareAsync("myQueue", durable: true, exclusive: false, autoDelete: false);
await bus.Advanced.BindAsync(exchange, queue, "*");
var consumer = bus.Advanced.Consume(queue, new TestMessageHandlerDelegateWrapper(_testOutputHelper, _eventHistory).HandleMessageAsync, _ => { });
var messages = Enumerable.Range(0, noOfMessages).Select(i => new TestEvent
{
Id = (ushort)i,
}).ToList();
foreach (var m in messages)
{
var body = new ReadOnlyMemory<byte>(JsonSerializer.SerializeToUtf8Bytes(m));
await bus.Advanced.PublishAsync(exchange, "*", true,
properties: new MessageProperties(),
body);
}
// Assert
// wait until all messages are consumed
SpinWait.SpinUntil(() => _eventHistory.Count.Equals(2 * messages.Count),
BlockingTaskDuration.Multiply(noOfMessages).Multiply(2));
// Assert that all messages were consumed concurrently
_eventHistory.Count.Should().Be(2 * messages.Count);
_eventHistory.Count(x => x.Step.Equals(EventTrack.ConsumingStep.Start)).Should().Be(messages.Count);
var eventHistorySequential = _eventHistory
.OrderBy(e => e.Event.Id)
.ThenBy(e => e.Step);
_eventHistory.Should().BeEquivalentTo(eventHistorySequential, x => x.WithStrictOrdering());
consumer.Dispose(); //explicit disposable is required to stop stealing messages from subsequent test runs
}
[Theory]
[MemberData(nameof(SequentialConsumptionTestData))]
public async Task EasyNetQ_ConsumeAsync_Should_Consume_Sequentially(ushort prefetchCount,
ushort consumerDispatchConcurrency,
ushort noOfMessages)
{
var connectionConfig = new ConnectionConfiguration
{
Hosts = new List<HostConfiguration>() { new() { Host = "localhost", Port = 2765 } },
VirtualHost = "/",
ConsumerDispatcherConcurrency = consumerDispatchConcurrency,
PrefetchCount = prefetchCount,
UserName = "guest",
Password = "guest",
};
// Arrange
var bus = RabbitHutch.CreateBus(_ => connectionConfig,
register => register.RegisterDefaultServices(_ => connectionConfig));
var exchange = await bus.Advanced.ExchangeDeclareAsync("myExchange", ExchangeType.Topic);
var queue = await bus.Advanced.QueueDeclareAsync("myQueue", durable: true, exclusive: false, autoDelete: false);
await bus.Advanced.BindAsync(exchange, queue, "*");
var consumer = bus.Advanced.Consume<TestEvent>(queue, (m, _) =>
new TestMessageHandler(_testOutputHelper, _eventHistory).Handle(m.Body));
var messages = Enumerable.Range(0, noOfMessages).Select(i => new TestEvent
{
Id = (ushort)i,
}).ToList();
foreach (var m in messages)
{
await bus.Advanced.PublishAsync(exchange, "*", true, new Message<TestEvent>(m));
}
// Assert
// wait until all messages are consumed
SpinWait.SpinUntil(() => _eventHistory.Count.Equals(2 * messages.Count),
BlockingTaskDuration.Multiply(noOfMessages).Multiply(2));
// Assert that all messages were consumed concurrently
_eventHistory.Count.Should().Be(2 * messages.Count);
_eventHistory.Count(x => x.Step.Equals(EventTrack.ConsumingStep.Start)).Should().Be(messages.Count);
var eventHistorySequential = _eventHistory
.OrderBy(e => e.Event.Id)
.ThenBy(e => e.Step);
_eventHistory.Should().BeEquivalentTo(eventHistorySequential, x => x.WithStrictOrdering());
consumer.Dispose(); //explicit disposable is required to stop stealing messages from subsequent test runs
}
[Theory]
[MemberData(nameof(SequentialConsumptionTestData))]
public async Task EasyNetQ_ConsumeAsync_Raw_Should_Consume_Sequentially(ushort prefetchCount,
ushort consumerDispatchConcurrency,
ushort noOfMessages)
{
var connectionConfig = new ConnectionConfiguration
{
Hosts = new List<HostConfiguration>() { new() { Host = "localhost", Port = 2765 } },
VirtualHost = "/",
ConsumerDispatcherConcurrency = consumerDispatchConcurrency,
PrefetchCount = prefetchCount,
UserName = "guest",
Password = "guest",
};
// Arrange
var bus = RabbitHutch.CreateBus(_ => connectionConfig,
register => register.RegisterDefaultServices(_ => connectionConfig));
var exchange = await bus.Advanced.ExchangeDeclareAsync("myExchange", ExchangeType.Topic);
var queue = await bus.Advanced.QueueDeclareAsync("myQueue", durable: true, exclusive: false, autoDelete: false);
await bus.Advanced.BindAsync(exchange, queue, "*");
var consumer = bus.Advanced.Consume(queue, (body, property, info) =>
{
var m = JsonSerializer.Deserialize<TestEvent>(body.Span)!;
new TestMessageHandler(_testOutputHelper, _eventHistory).Handle(m);
});
var messages = Enumerable.Range(0, noOfMessages).Select(i => new TestEvent
{
Id = (ushort)i,
}).ToList();
foreach (var m in messages)
{
var body = new ReadOnlyMemory<byte>(JsonSerializer.SerializeToUtf8Bytes(m));
await bus.Advanced.PublishAsync(exchange, "*", true,
properties: new MessageProperties(),
body);
}
// Assert
// wait until all messages are consumed
SpinWait.SpinUntil(() => _eventHistory.Count.Equals(2 * messages.Count),
BlockingTaskDuration.Multiply(noOfMessages).Multiply(2));
// Assert that all messages were consumed concurrently
_eventHistory.Count.Should().Be(2 * messages.Count);
_eventHistory.Count(x => x.Step.Equals(EventTrack.ConsumingStep.Start)).Should().Be(messages.Count);
var eventHistorySequential = _eventHistory
.OrderBy(e => e.Event.Id)
.ThenBy(e => e.Step);
_eventHistory.Should().BeEquivalentTo(eventHistorySequential, x => x.WithStrictOrdering());
consumer.Dispose(); //explicit disposable is required to stop stealing messages from subsequent test runs
}
[Theory]
[MemberData(nameof(ConcurrentConsumptionTestData))]
public async Task EasyNetQ_SubscribeAsync_Should_Consume_Concurrently(ushort prefetchCount,
ushort consumerDispatchConcurrency,
ushort noOfMessages)
{
var connectionConfig = new ConnectionConfiguration
{
Hosts = new List<HostConfiguration>() { new() { Host = "localhost", Port = 2765 } },
VirtualHost = "/",
ConsumerDispatcherConcurrency = consumerDispatchConcurrency,
PrefetchCount = prefetchCount,
UserName = "guest",
Password = "guest",
};
// Arrange
var bus = RabbitHutch.CreateBus(_ => connectionConfig,
register => register.RegisterDefaultServices(_ => connectionConfig));
await bus.PubSub.SubscribeAsync<TestEvent>(Guid.NewGuid().ToString(),
e => new TestMessageHandler(_testOutputHelper, _eventHistory).Handle(e));
var messages = Enumerable.Range(0, noOfMessages).Select(i => new TestEvent
{
Id = (ushort)i,
}).ToList();
// Act
foreach (var m in messages)
{
await bus.PubSub.PublishAsync(m);
}
// Assert
// wait until all messages are consumed
SpinWait.SpinUntil(() => _eventHistory.Count.Equals(2 * messages.Count),
BlockingTaskDuration.Multiply(noOfMessages).Multiply(2));
// Assert that all messages were consumed sequentially
_eventHistory.Count.Should().Be(2 * messages.Count);
_eventHistory.Count(x => x.Step.Equals(EventTrack.ConsumingStep.Start)).Should().Be(messages.Count);
var eventHistorySequential = _eventHistory
.OrderBy(e => e.Event.Id)
.ThenBy(e => e.Step);
_eventHistory.Should().NotBeEquivalentTo(eventHistorySequential, x => x.WithStrictOrdering());
}
[Theory]
[MemberData(nameof(SequentialConsumptionTestData))]
public async Task EasyNetQ_SubscribeAsync_Should_Consume_Sequentially(ushort prefetchCount,
ushort consumerDispatchConcurrency,
ushort noOfMessages)
{
var connectionConfig = new ConnectionConfiguration
{
Hosts = new List<HostConfiguration>() { new() { Host = "localhost", Port = 2765 } },
VirtualHost = "/",
ConsumerDispatcherConcurrency = consumerDispatchConcurrency,
PrefetchCount = prefetchCount,
UserName = "guest",
Password = "guest",
};
// Arrange
var bus = RabbitHutch.CreateBus(_ => connectionConfig,
register => register.RegisterDefaultServices(_ => connectionConfig));
await bus.PubSub.SubscribeAsync<TestEvent>(Guid.NewGuid().ToString(),
e => new TestMessageHandler(_testOutputHelper, _eventHistory).Handle(e));
var messages = Enumerable.Range(0, noOfMessages).Select(i => new TestEvent
{
Id = (ushort)i,
}).ToList();
// Act
foreach (var m in messages)
{
await bus.PubSub.PublishAsync(m);
}
// Assert
// wait until all messages are consumed
SpinWait.SpinUntil(() => _eventHistory.Count.Equals(2 * messages.Count),
BlockingTaskDuration.Multiply(noOfMessages).Multiply(2));
// Assert that all messages were consumed sequentially
_eventHistory.Count.Should().Be(2 * messages.Count);
_eventHistory.Count(x => x.Step.Equals(EventTrack.ConsumingStep.Start)).Should().Be(messages.Count);
var eventHistorySequential = _eventHistory
.OrderBy(e => e.Event.Id)
.ThenBy(e => e.Step);
_eventHistory.Should().BeEquivalentTo(eventHistorySequential, x => x.WithStrictOrdering());
}
public static IEnumerable<object[]> ConcurrentConsumptionTestData()
{
//prefetchCount, consumerDispatchConcurrency, noOfMessages
yield return new object[] { 50, 30, 10 };
yield return new object[] { 50, 100, 50 };
yield return new object[] { 50, 3, 2 };
}
public static IEnumerable<object[]> SequentialConsumptionTestData()
{
//prefetchCount, consumerDispatchConcurrency, noOfMessages
yield return new object[] { 1, 10, 10 };
yield return new object[] { 10, 1, 10 };
}
public class TestMessageHandler
{
private readonly ITestOutputHelper _testOutputHelper;
private readonly ConcurrentQueue<EventTrack> _eventHistory;
public TestMessageHandler(ITestOutputHelper testOutputHelper, ConcurrentQueue<EventTrack> eventHistory)
{
_testOutputHelper = testOutputHelper;
_eventHistory = eventHistory;
}
public Action<TestEvent> Handle => e =>
{
_testOutputHelper.WriteLine("Start consuming message {0}", e.Id);
_eventHistory.Enqueue(new EventTrack(e, EventTrack.ConsumingStep.Start));
// This delay simulates CPU intensive operations, blocking the current thread
Thread.Sleep(BlockingTaskDuration);
_testOutputHelper.WriteLine("End consuming message {0}", e.Id);
_eventHistory.Enqueue(new EventTrack(e, EventTrack.ConsumingStep.End));
};
}
public class TestMessageHandlerDelegateWrapper
{
private readonly ITestOutputHelper _testOutputHelper;
private readonly ConcurrentQueue<EventTrack> _eventHistory;
public TestMessageHandlerDelegateWrapper(ITestOutputHelper testOutputHelper, ConcurrentQueue<EventTrack> eventHistory)
{
_testOutputHelper = testOutputHelper;
_eventHistory = eventHistory;
}
public async Task<AckStrategy> HandleMessageAsync(
ReadOnlyMemory<byte> body,
MessageProperties properties,
MessageReceivedInfo receivedInfo,
CancellationToken cancellationToken)
{
var e = JsonSerializer.Deserialize<TestEvent>(body.Span)!;
_testOutputHelper.WriteLine("Start consuming message {0}", e.Id);
_eventHistory.Enqueue(new EventTrack(e, EventTrack.ConsumingStep.Start));
// This delay simulates CPU intensive operations, blocking the current thread
Thread.Sleep(BlockingTaskDuration);
_testOutputHelper.WriteLine("End consuming message {0}", e.Id);
_eventHistory.Enqueue(new EventTrack(e, EventTrack.ConsumingStep.End));
return AckStrategies.Ack;
}
}
public sealed class TestEvent
{
public ushort Id { get; set; }
}
public record EventTrack(TestEvent Event, EventTrack.ConsumingStep Step)
{
public enum ConsumingStep
{
Start = 0, End = 1
}
}
}
<Project>
<PropertyGroup>
<Nullable>enable</Nullable>
</PropertyGroup>
<PropertyGroup>
<EasyNetQLibraryVersion>7.8.0</EasyNetQLibraryVersion>
<MicrosoftExtensionLibraryVersion>3.1.8</MicrosoftExtensionLibraryVersion>
<!-- Testing libraries -->
<FluentAssertionsLibraryVersion>6.12.0</FluentAssertionsLibraryVersion>
</PropertyGroup>
</Project>
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>net7.0</TargetFramework>
<RootNamespace>easynetq_concurrency_tests</RootNamespace>
<ImplicitUsings>enable</ImplicitUsings>
<Nullable>enable</Nullable>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="EasyNetQ" Version="$(EasyNetQLibraryVersion)" />
<PackageReference Include="EasyNetQ.Serialization.NewtonsoftJson" Version="$(EasyNetQLibraryVersion)" />
<PackageReference Include="FluentAssertions" Version="$(FluentAssertionsLibraryVersion)" />
<PackageReference Include="Microsoft.Extensions.DependencyInjection" Version="$(MicrosoftExtensionLibraryVersion)" />
<PackageReference Include="Microsoft.Extensions.Hosting" Version="$(MicrosoftExtensionLibraryVersion)" />
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="16.7.1" />
<PackageReference Include="xunit" Version="2.4.1" />
<PackageReference Include="xunit.runner.visualstudio" Version="2.4.3">
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
<PrivateAssets>all</PrivateAssets>
</PackageReference>
</ItemGroup>
</Project>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment