Skip to content

Instantly share code, notes, and snippets.

@neilgreatorex
Last active June 3, 2024 19:47
Show Gist options
  • Save neilgreatorex/63e163c933ca8b2836b987eeac85a7b4 to your computer and use it in GitHub Desktop.
Save neilgreatorex/63e163c933ca8b2836b987eeac85a7b4 to your computer and use it in GitHub Desktop.
RabbitMQ channel leak test class
namespace RabbitMqTestConsoleApp;
using System;
using System.Collections.Generic;
using RabbitMQ.Client;
class Program
{
static async Task Main(string[] args)
{
var connectionFactory = new ConnectionFactory
{
HostName = "localhost",
UserName = "guest",
Password = "guest",
AutomaticRecoveryEnabled = true,
ClientProvidedName = "Channel leak test",
ContinuationTimeout = TimeSpan.FromSeconds(5)
};
var connection = await connectionFactory.CreateConnectionAsync().ConfigureAwait(false);
connection.ConnectionBlocked += (sender, eventArgs) => Log("*** Connection blocked!");
connection.ConnectionUnblocked += (sender, eventArgs) => Log("*** Connection unblocked!");
connection.ConnectionShutdown += (sender, eventArgs) => Log("*** Connection shutdown!");
await TestSimplePublish(connection).ConfigureAwait(false);
await TestMultipleChannelOpen(connection).ConfigureAwait(false);
await Do("close connection", async () => await connection.CloseAsync(TimeSpan.FromSeconds(15)).ConfigureAwait(false)).ConfigureAwait(false);
await Do("dispose connection", () => { connection.Dispose(); return Task.CompletedTask; }).ConfigureAwait(false);
}
private static async Task TestMultipleChannelOpen(IConnection connection)
{
List<IChannel> channels = new List<IChannel>();
List<Task> channelTasks = new List<Task>();
for (var i = 1; i <= 5; i++)
{
try
{
channelTasks.Add(Do($"open channel {i}", async () => channels.Add(await connection.CreateChannelAsync().ConfigureAwait(false))));
}
catch (TimeoutException e)
{
Log($"*** Timeout exception for channel {i}");
}
}
Task.WaitAll(channelTasks.ToArray());
Log($"{channels.Count} channel references held");
WaitForInput("check channel count, should be 1");
WaitForInput("clear broker resource constraint, re-check channel count, should now be 0");
channels.ForEach(async channel => await Do("abort channel", async () => await channel.AbortAsync().ConfigureAwait(false)).ConfigureAwait(false));
channels.Clear();
await Do("collect garbage", () => { GC.Collect(); return Task.CompletedTask; }).ConfigureAwait(false);
WaitForInput("check channel count again, should still be 0");
}
private static async Task Do(string message, Func<Task> action)
{
Log($"About to '{message}'");
try
{
await action().ConfigureAwait(false);
}
catch (TaskCanceledException)
{
Log("Ignoring task cancelled exception");
}
Log($"Finished '{message}'");
}
private static void Log(string message)
{
Console.WriteLine($"{DateTime.Now:yyyy-MM-dd HH:mm:ss.fff}: {message}");
}
private static void WaitForInput(string message)
{
Console.WriteLine($"Please {message} and then press any key to continue...");
Console.ReadKey();
}
private static async Task TestSimplePublish(IConnection connection)
{
IChannel? model = null;
await Do("open publish channel", async () => model = await connection.CreateChannelAsync().ConfigureAwait(false)).ConfigureAwait(false);
await Do("declare exchange", async () => await model!.ExchangeDeclareAsync("test_exchange", "direct", autoDelete: true).ConfigureAwait(false)).ConfigureAwait(false);
await Do("publish message", async () => await model!.BasicPublishAsync("test_exchange", "test").ConfigureAwait(false)).ConfigureAwait(false);
await Do("close publish channel", async () => await model!.CloseAsync().ConfigureAwait(false)).ConfigureAwait(false);
await Do("dispose publish channel", () => { model!.Dispose(); return Task.CompletedTask; }).ConfigureAwait(false);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment