Skip to content

Instantly share code, notes, and snippets.

@lukeschafer
Created March 27, 2014 00:59
Show Gist options
  • Save lukeschafer/9797555 to your computer and use it in GitHub Desktop.
Save lukeschafer/9797555 to your computer and use it in GitHub Desktop.
SBWS/AzureSB Comparison of concurrency of receiving queued messages between in-built message pump and custom.
using System;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.ServiceBus.Messaging;
namespace ConsoleApplication1
{
[Serializable] public class Msg { public string Test { get; set; } }
class Program
{
static void Main(string[] args)
{
const string queueName = "TestQueue";
const string connectionString = "SERVICE_BUS_CONNECTION_STRING";
const bool useCustomMessagePump = true;
var client = QueueClient.CreateFromConnectionString(connectionString, queueName);
for (var i = 0; i < 10; i++) Send(client); //push some messages
if (useCustomMessagePump)
CustomListen(queueName, connectionString, client);
else
PumpListen(queueName, connectionString, client);
}
public static void CustomListen(string queueName, string connectionString, QueueClient client)
{
var cancelMessagePumps = CreateMessagePumps(client, Environment.ProcessorCount, Handle);
Console.ReadLine();
cancelMessagePumps.Cancel();
}
public static void PumpListen(string queueName, string connectionString, QueueClient client)
{
var receiver = client.MessagingFactory.CreateMessageReceiver(queueName);
receiver.OnMessageAsync(Handle, new OnMessageOptions
{
AutoComplete = false,
MaxConcurrentCalls = Environment.ProcessorCount
});
Console.ReadLine();
}
public static async Task Handle(BrokeredMessage message)
{
Console.WriteLine("Got Message " + message.MessageId);
Thread.Sleep(2000); //long enough to be human-visible
await message.CompleteAsync();
Console.WriteLine("Done Message " + message.MessageId);
}
public static void Send(QueueClient client)
{
client.SendAsync(new BrokeredMessage(new Msg()) { MessageId = Guid.NewGuid().ToString() });
}
public static CancellationTokenSource CreateMessagePumps(QueueClient client, int count, Func<BrokeredMessage, Task> doAction)
{
var cancellation = new CancellationTokenSource();
var taskFactory = new TaskFactory();
for (var i = 0; i < count; i++)
taskFactory.StartNew(() =>
{
while (Thread.CurrentThread.IsAlive)
{
var msg = client.Receive(TimeSpan.FromSeconds(10));
if (msg != null) doAction(msg).Wait(cancellation.Token);
}
}, cancellation.Token);
return cancellation;
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment