Created
March 27, 2014 00:59
-
-
Save lukeschafer/9797555 to your computer and use it in GitHub Desktop.
SBWS/AzureSB Comparison of concurrency of receiving queued messages between in-built message pump and custom.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System; | |
using System.Threading; | |
using System.Threading.Tasks; | |
using Microsoft.ServiceBus.Messaging; | |
namespace ConsoleApplication1 | |
{ | |
[Serializable] public class Msg { public string Test { get; set; } } | |
class Program | |
{ | |
static void Main(string[] args) | |
{ | |
const string queueName = "TestQueue"; | |
const string connectionString = "SERVICE_BUS_CONNECTION_STRING"; | |
const bool useCustomMessagePump = true; | |
var client = QueueClient.CreateFromConnectionString(connectionString, queueName); | |
for (var i = 0; i < 10; i++) Send(client); //push some messages | |
if (useCustomMessagePump) | |
CustomListen(queueName, connectionString, client); | |
else | |
PumpListen(queueName, connectionString, client); | |
} | |
public static void CustomListen(string queueName, string connectionString, QueueClient client) | |
{ | |
var cancelMessagePumps = CreateMessagePumps(client, Environment.ProcessorCount, Handle); | |
Console.ReadLine(); | |
cancelMessagePumps.Cancel(); | |
} | |
public static void PumpListen(string queueName, string connectionString, QueueClient client) | |
{ | |
var receiver = client.MessagingFactory.CreateMessageReceiver(queueName); | |
receiver.OnMessageAsync(Handle, new OnMessageOptions | |
{ | |
AutoComplete = false, | |
MaxConcurrentCalls = Environment.ProcessorCount | |
}); | |
Console.ReadLine(); | |
} | |
public static async Task Handle(BrokeredMessage message) | |
{ | |
Console.WriteLine("Got Message " + message.MessageId); | |
Thread.Sleep(2000); //long enough to be human-visible | |
await message.CompleteAsync(); | |
Console.WriteLine("Done Message " + message.MessageId); | |
} | |
public static void Send(QueueClient client) | |
{ | |
client.SendAsync(new BrokeredMessage(new Msg()) { MessageId = Guid.NewGuid().ToString() }); | |
} | |
public static CancellationTokenSource CreateMessagePumps(QueueClient client, int count, Func<BrokeredMessage, Task> doAction) | |
{ | |
var cancellation = new CancellationTokenSource(); | |
var taskFactory = new TaskFactory(); | |
for (var i = 0; i < count; i++) | |
taskFactory.StartNew(() => | |
{ | |
while (Thread.CurrentThread.IsAlive) | |
{ | |
var msg = client.Receive(TimeSpan.FromSeconds(10)); | |
if (msg != null) doAction(msg).Wait(cancellation.Token); | |
} | |
}, cancellation.Token); | |
return cancellation; | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment