Last active
June 3, 2024 19:47
-
-
Save neilgreatorex/63e163c933ca8b2836b987eeac85a7b4 to your computer and use it in GitHub Desktop.
RabbitMQ channel leak test class
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
namespace RabbitMqTestConsoleApp; | |
using System; | |
using System.Collections.Generic; | |
using RabbitMQ.Client; | |
class Program | |
{ | |
static async Task Main(string[] args) | |
{ | |
var connectionFactory = new ConnectionFactory | |
{ | |
HostName = "localhost", | |
UserName = "guest", | |
Password = "guest", | |
AutomaticRecoveryEnabled = true, | |
ClientProvidedName = "Channel leak test", | |
ContinuationTimeout = TimeSpan.FromSeconds(5) | |
}; | |
var connection = await connectionFactory.CreateConnectionAsync().ConfigureAwait(false); | |
connection.ConnectionBlocked += (sender, eventArgs) => Log("*** Connection blocked!"); | |
connection.ConnectionUnblocked += (sender, eventArgs) => Log("*** Connection unblocked!"); | |
connection.ConnectionShutdown += (sender, eventArgs) => Log("*** Connection shutdown!"); | |
await TestSimplePublish(connection).ConfigureAwait(false); | |
await TestMultipleChannelOpen(connection).ConfigureAwait(false); | |
await Do("close connection", async () => await connection.CloseAsync(TimeSpan.FromSeconds(15)).ConfigureAwait(false)).ConfigureAwait(false); | |
await Do("dispose connection", () => { connection.Dispose(); return Task.CompletedTask; }).ConfigureAwait(false); | |
} | |
private static async Task TestMultipleChannelOpen(IConnection connection) | |
{ | |
List<IChannel> channels = new List<IChannel>(); | |
List<Task> channelTasks = new List<Task>(); | |
for (var i = 1; i <= 5; i++) | |
{ | |
try | |
{ | |
channelTasks.Add(Do($"open channel {i}", async () => channels.Add(await connection.CreateChannelAsync().ConfigureAwait(false)))); | |
} | |
catch (TimeoutException e) | |
{ | |
Log($"*** Timeout exception for channel {i}"); | |
} | |
} | |
Task.WaitAll(channelTasks.ToArray()); | |
Log($"{channels.Count} channel references held"); | |
WaitForInput("check channel count, should be 1"); | |
WaitForInput("clear broker resource constraint, re-check channel count, should now be 0"); | |
channels.ForEach(async channel => await Do("abort channel", async () => await channel.AbortAsync().ConfigureAwait(false)).ConfigureAwait(false)); | |
channels.Clear(); | |
await Do("collect garbage", () => { GC.Collect(); return Task.CompletedTask; }).ConfigureAwait(false); | |
WaitForInput("check channel count again, should still be 0"); | |
} | |
private static async Task Do(string message, Func<Task> action) | |
{ | |
Log($"About to '{message}'"); | |
try | |
{ | |
await action().ConfigureAwait(false); | |
} | |
catch (TaskCanceledException) | |
{ | |
Log("Ignoring task cancelled exception"); | |
} | |
Log($"Finished '{message}'"); | |
} | |
private static void Log(string message) | |
{ | |
Console.WriteLine($"{DateTime.Now:yyyy-MM-dd HH:mm:ss.fff}: {message}"); | |
} | |
private static void WaitForInput(string message) | |
{ | |
Console.WriteLine($"Please {message} and then press any key to continue..."); | |
Console.ReadKey(); | |
} | |
private static async Task TestSimplePublish(IConnection connection) | |
{ | |
IChannel? model = null; | |
await Do("open publish channel", async () => model = await connection.CreateChannelAsync().ConfigureAwait(false)).ConfigureAwait(false); | |
await Do("declare exchange", async () => await model!.ExchangeDeclareAsync("test_exchange", "direct", autoDelete: true).ConfigureAwait(false)).ConfigureAwait(false); | |
await Do("publish message", async () => await model!.BasicPublishAsync("test_exchange", "test").ConfigureAwait(false)).ConfigureAwait(false); | |
await Do("close publish channel", async () => await model!.CloseAsync().ConfigureAwait(false)).ConfigureAwait(false); | |
await Do("dispose publish channel", () => { model!.Dispose(); return Task.CompletedTask; }).ConfigureAwait(false); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment