Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
Azure Service Bus Batching Speed Test
// (created in LINQPad with the following references and namespaces)
// <Query Kind="Statements">
// <Reference>&lt;RuntimeDirectory&gt;\System.Runtime.Serialization.dll</Reference>
// <NuGetReference>WindowsAzure.ServiceBus</NuGetReference>
// <Namespace>Microsoft.ServiceBus</Namespace>
// <Namespace>Microsoft.ServiceBus.Messaging</Namespace>
// </Query>
string connectionString = Util.GetPassword("Test Azure Service Bus Connection String");
const string queueName = "MarkHeathTestQueue";
// PART 1 - CREATE THE QUEUE
var namespaceManager =
NamespaceManager.CreateFromConnectionString(connectionString);
if (namespaceManager.QueueExists(queueName))
{
namespaceManager.DeleteQueue(queueName);
}
namespaceManager.CreateQueue(queueName);
var queueDescription = namespaceManager.GetQueue(queueName);
if (queueDescription.MessageCount != 0)
{
throw new InvalidOperationException($"{queueName} contains {queueDescription.MessageCount} messages");
}
// PART 2 - SEND MESSAGES
const int messages = 1000;
var stopwatch = new Stopwatch();
const bool useBatching = false;
var client = QueueClient.CreateFromConnectionString(connectionString, queueName);
stopwatch.Start();
if (useBatching)
{
client.SendBatch(Enumerable.Range(0, messages).Select(n =>
{
var body = $"Hello World, this is message {n}";
var message = new BrokeredMessage(body);
message.Properties["From"] = "Mark Heath";
return message;
}));
}
else
{
for (int n = 0; n < messages; n++)
{
var body = $"Hello World, this is message {n}";
var message = new BrokeredMessage(body);
message.Properties["From"] = "Mark Heath";
client.Send(message);
}
}
Console.WriteLine($"{stopwatch.ElapsedMilliseconds}ms to send {messages} messages");
stopwatch.Reset();
// PART 3 - RECEIVE MESSAGES
stopwatch.Start();
int received = 0;
if (useBatching)
{
while (received < messages)
{
var rx = client.ReceiveBatch(messages, TimeSpan.FromSeconds(5)).ToList();
Console.WriteLine("Received a batch of {0}", rx.Count);
if (rx.Count > 0)
{
client.CompleteBatch(rx.Select(m => m.LockToken));
received += rx.Count;
}
}
}
else
{
while (received < messages)
{
var message = client.Receive();
if (message == null) break;
received++;
message.Complete();
}
}
Console.WriteLine($"{stopwatch.ElapsedMilliseconds}ms to receive {received} messages");
stopwatch.Reset();
queueDescription = namespaceManager.GetQueue(queueName);
if (queueDescription.MessageCount != 0)
{
throw new InvalidOperationException($"{queueName} contains {queueDescription.MessageCount} messages");
}
Console.WriteLine("Done");
// (created in LINQPad with the following references and namespaces)
//<Query Kind="FSharpProgram">
// <Reference>&lt;RuntimeDirectory&gt;\System.Runtime.Serialization.dll</Reference>
// <NuGetReference>WindowsAzure.ServiceBus</NuGetReference>
// <Namespace>Microsoft.ServiceBus</Namespace>
// <Namespace>Microsoft.ServiceBus.Messaging</Namespace>
//</Query>
let timed fn action =
let s = new Stopwatch()
printfn "Starting %s" action
s.Start()
fn()
printfn "Took %dms %s" s.ElapsedMilliseconds action
let ensureQueueExists (nm:NamespaceManager) qn =
if not (nm.QueueExists qn) then
nm.CreateQueue qn |> ignore
let ensureQueueIsEmpty (nm:NamespaceManager) qn =
let qd = nm.GetQueue qn
if qd.MessageCountDetails.ActiveMessageCount > 0L then
failwithf "%s has %d messages" qn qd.MessageCountDetails.ActiveMessageCount
let makeMessage n =
let body = sprintf "Hello World, this is message %d" n
let message = new BrokeredMessage (body)
message.Properties.["From"] <- "Mark Heath"
message
let sendIndividually (qc:QueueClient) count =
let sendOne n = qc.Send (makeMessage n)
[1..count] |> List.iter sendOne
let receiveIndividually (qc:QueueClient) count =
let rx = [1..count]
|> Seq.map (fun _ -> qc.Receive())
|> Seq.takeWhile (fun bm -> not (bm = null))
|> Seq.map (fun bm -> bm.Complete())
|> Seq.length
printfn "Got %d messages" rx
let sendBatched (qc:QueueClient) count =
[1..count] |> List.map makeMessage |> qc.SendBatch
let rec receiveBatched (qc:QueueClient) batchSize runningTotal =
let timeout = TimeSpan.FromSeconds 5.0
let rx = qc.ReceiveBatch(batchSize, timeout) |> Seq.toArray
match rx with
| [||] ->
printfn "Empty batch, total received %d" runningTotal
| _ ->
let rxCount = rx.Length
printfn "Got batch of %d messages" rxCount
qc.CompleteBatch (rx |> Array.map (fun m -> m.LockToken))
let totalSoFar = runningTotal + rxCount
if totalSoFar < batchSize then
receiveBatched qc batchSize totalSoFar
let connectionString = Util.GetPassword "Test Azure Service Bus Connection String"
let nm = NamespaceManager.CreateFromConnectionString connectionString
let queueName = "MarkHeathTestQueue"
let messages = 1000
ensureQueueExists nm queueName
ensureQueueIsEmpty nm queueName
let client = QueueClient.CreateFromConnectionString (connectionString, queueName)
timed (fun () -> sendIndividually client messages) (sprintf "Sending %d messages individually" messages)
timed (fun () -> receiveIndividually client messages) "Receiving messages individually"
timed (fun () -> sendBatched client messages) (sprintf "Sending %d messages batched" messages)
timed (fun () -> receiveBatched client messages 0) "Receiving messages batched"
ensureQueueIsEmpty nm queueName
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment