Last active
October 31, 2016 17:47
-
-
Save markheath/84922e7f9ba1ffd8aeccb1ba8b58e010 to your computer and use it in GitHub Desktop.
Azure Service Bus Batching Speed Test
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// (created in LINQPad with the following references and namespaces) | |
// <Query Kind="Statements"> | |
// <Reference><RuntimeDirectory>\System.Runtime.Serialization.dll</Reference> | |
// <NuGetReference>WindowsAzure.ServiceBus</NuGetReference> | |
// <Namespace>Microsoft.ServiceBus</Namespace> | |
// <Namespace>Microsoft.ServiceBus.Messaging</Namespace> | |
// </Query> | |
string connectionString = Util.GetPassword("Test Azure Service Bus Connection String"); | |
const string queueName = "MarkHeathTestQueue"; | |
// PART 1 - CREATE THE QUEUE | |
var namespaceManager = | |
NamespaceManager.CreateFromConnectionString(connectionString); | |
if (namespaceManager.QueueExists(queueName)) | |
{ | |
namespaceManager.DeleteQueue(queueName); | |
} | |
namespaceManager.CreateQueue(queueName); | |
var queueDescription = namespaceManager.GetQueue(queueName); | |
if (queueDescription.MessageCount != 0) | |
{ | |
throw new InvalidOperationException($"{queueName} contains {queueDescription.MessageCount} messages"); | |
} | |
// PART 2 - SEND MESSAGES | |
const int messages = 1000; | |
var stopwatch = new Stopwatch(); | |
const bool useBatching = false; | |
var client = QueueClient.CreateFromConnectionString(connectionString, queueName); | |
stopwatch.Start(); | |
if (useBatching) | |
{ | |
client.SendBatch(Enumerable.Range(0, messages).Select(n => | |
{ | |
var body = $"Hello World, this is message {n}"; | |
var message = new BrokeredMessage(body); | |
message.Properties["From"] = "Mark Heath"; | |
return message; | |
})); | |
} | |
else | |
{ | |
for (int n = 0; n < messages; n++) | |
{ | |
var body = $"Hello World, this is message {n}"; | |
var message = new BrokeredMessage(body); | |
message.Properties["From"] = "Mark Heath"; | |
client.Send(message); | |
} | |
} | |
Console.WriteLine($"{stopwatch.ElapsedMilliseconds}ms to send {messages} messages"); | |
stopwatch.Reset(); | |
// PART 3 - RECEIVE MESSAGES | |
stopwatch.Start(); | |
int received = 0; | |
if (useBatching) | |
{ | |
while (received < messages) | |
{ | |
var rx = client.ReceiveBatch(messages, TimeSpan.FromSeconds(5)).ToList(); | |
Console.WriteLine("Received a batch of {0}", rx.Count); | |
if (rx.Count > 0) | |
{ | |
client.CompleteBatch(rx.Select(m => m.LockToken)); | |
received += rx.Count; | |
} | |
} | |
} | |
else | |
{ | |
while (received < messages) | |
{ | |
var message = client.Receive(); | |
if (message == null) break; | |
received++; | |
message.Complete(); | |
} | |
} | |
Console.WriteLine($"{stopwatch.ElapsedMilliseconds}ms to receive {received} messages"); | |
stopwatch.Reset(); | |
queueDescription = namespaceManager.GetQueue(queueName); | |
if (queueDescription.MessageCount != 0) | |
{ | |
throw new InvalidOperationException($"{queueName} contains {queueDescription.MessageCount} messages"); | |
} | |
Console.WriteLine("Done"); |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// (created in LINQPad with the following references and namespaces) | |
//<Query Kind="FSharpProgram"> | |
// <Reference><RuntimeDirectory>\System.Runtime.Serialization.dll</Reference> | |
// <NuGetReference>WindowsAzure.ServiceBus</NuGetReference> | |
// <Namespace>Microsoft.ServiceBus</Namespace> | |
// <Namespace>Microsoft.ServiceBus.Messaging</Namespace> | |
//</Query> | |
let timed fn action = | |
let s = new Stopwatch() | |
printfn "Starting %s" action | |
s.Start() | |
fn() | |
printfn "Took %dms %s" s.ElapsedMilliseconds action | |
let ensureQueueExists (nm:NamespaceManager) qn = | |
if not (nm.QueueExists qn) then | |
nm.CreateQueue qn |> ignore | |
let ensureQueueIsEmpty (nm:NamespaceManager) qn = | |
let qd = nm.GetQueue qn | |
if qd.MessageCountDetails.ActiveMessageCount > 0L then | |
failwithf "%s has %d messages" qn qd.MessageCountDetails.ActiveMessageCount | |
let makeMessage n = | |
let body = sprintf "Hello World, this is message %d" n | |
let message = new BrokeredMessage (body) | |
message.Properties.["From"] <- "Mark Heath" | |
message | |
let sendIndividually (qc:QueueClient) count = | |
let sendOne n = qc.Send (makeMessage n) | |
[1..count] |> List.iter sendOne | |
let receiveIndividually (qc:QueueClient) count = | |
let rx = [1..count] | |
|> Seq.map (fun _ -> qc.Receive()) | |
|> Seq.takeWhile (fun bm -> not (bm = null)) | |
|> Seq.map (fun bm -> bm.Complete()) | |
|> Seq.length | |
printfn "Got %d messages" rx | |
let sendBatched (qc:QueueClient) count = | |
[1..count] |> List.map makeMessage |> qc.SendBatch | |
let rec receiveBatched (qc:QueueClient) batchSize runningTotal = | |
let timeout = TimeSpan.FromSeconds 5.0 | |
let rx = qc.ReceiveBatch(batchSize, timeout) |> Seq.toArray | |
match rx with | |
| [||] -> | |
printfn "Empty batch, total received %d" runningTotal | |
| _ -> | |
let rxCount = rx.Length | |
printfn "Got batch of %d messages" rxCount | |
qc.CompleteBatch (rx |> Array.map (fun m -> m.LockToken)) | |
let totalSoFar = runningTotal + rxCount | |
if totalSoFar < batchSize then | |
receiveBatched qc batchSize totalSoFar | |
let connectionString = Util.GetPassword "Test Azure Service Bus Connection String" | |
let nm = NamespaceManager.CreateFromConnectionString connectionString | |
let queueName = "MarkHeathTestQueue" | |
let messages = 1000 | |
ensureQueueExists nm queueName | |
ensureQueueIsEmpty nm queueName | |
let client = QueueClient.CreateFromConnectionString (connectionString, queueName) | |
timed (fun () -> sendIndividually client messages) (sprintf "Sending %d messages individually" messages) | |
timed (fun () -> receiveIndividually client messages) "Receiving messages individually" | |
timed (fun () -> sendBatched client messages) (sprintf "Sending %d messages batched" messages) | |
timed (fun () -> receiveBatched client messages 0) "Receiving messages batched" | |
ensureQueueIsEmpty nm queueName |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment