Created
August 15, 2012 02:14
-
-
Save Buildstarted/3354942 to your computer and use it in GitHub Desktop.
Testing Windows Azure Service Bus with reply queues
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class WindowsAzureServiceBusQueueTest | |
{ | |
private static readonly string ClientId = Guid.NewGuid().ToString("N"); | |
private TokenProvider _credentials; | |
private NamespaceManager _namespaceClient; | |
private MessagingFactory _factory; | |
static void Main(string[] args) | |
{ | |
var p = new WindowsAzureServiceBusQueueTest(); | |
p.Run(); | |
} | |
private void Run() | |
{ | |
//ensure queues exist and what not | |
Initialize(); | |
//create a request and response client for the client portion | |
var requestClient = _factory.CreateQueueClient(Manager.RequestQueue); | |
var responseClient = _factory.CreateQueueClient(Manager.ReplyQueue); | |
//create a session on the response so that we accept | |
var session = responseClient.AcceptMessageSession(ClientId); | |
//start the listening server | |
var server = StartServer(); | |
//send a message | |
var client = SendMessage(requestClient, session); | |
client.Start(); | |
client.Wait(); | |
Console.WriteLine(client.Result.Properties["data"].ToString()); | |
Console.WriteLine("Press any key to continue"); | |
Console.ReadKey(); | |
_factory.Close(); | |
} | |
private void Initialize() | |
{ | |
_credentials = TokenProvider.CreateSharedSecretTokenProvider(Manager.IssuerName, Manager.IssuerKey); | |
_namespaceClient = new NamespaceManager(ServiceBusEnvironment.CreateServiceUri("sb", Manager.ServiceNamespace, string.Empty), _credentials); | |
_factory = MessagingFactory.Create(ServiceBusEnvironment.CreateServiceUri("sb", Manager.ServiceNamespace, string.Empty), _credentials); | |
//ensure the queues exist | |
if (!_namespaceClient.QueueExists(Manager.RequestQueue)) | |
{ | |
var queue = new QueueDescription(Manager.RequestQueue); | |
_namespaceClient.CreateQueue(queue); | |
} | |
if (!_namespaceClient.QueueExists(Manager.ReplyQueue)) | |
{ | |
var queue = new QueueDescription(Manager.ReplyQueue); | |
queue.RequiresSession = true; | |
_namespaceClient.CreateQueue(queue); | |
} | |
} | |
private Task StartServer() | |
{ | |
return Task.Factory.StartNew(() => | |
{ | |
//create a client for request and response | |
var requestClient = _factory.CreateQueueClient(Manager.RequestQueue); | |
BrokeredMessage request; | |
//wait for a result a max of 24 hours | |
while ((request = requestClient.Receive(TimeSpan.FromHours(24))) != null) | |
{ | |
//mark the request as complete | |
request.Complete(); | |
Console.WriteLine("Received message: " + request.MessageId); | |
Console.WriteLine(); | |
var response = new BrokeredMessage | |
{ | |
//set these values to the original request | |
//this allows the requestor to listen for | |
//this specific message | |
SessionId = request.ReplyToSessionId, | |
MessageId = request.MessageId | |
}; | |
response.Properties["data"] = "muhahahahaha!"; | |
var responseClient = _factory.CreateQueueClient(request.ReplyTo); | |
//send the reply | |
responseClient.Send(response); | |
} | |
}); | |
} | |
private Task<BrokeredMessage> SendMessage(QueueClient requestClient, MessageSession session) | |
{ | |
return new Task<BrokeredMessage>(() => | |
{ | |
string messageId = Guid.NewGuid().ToString("N"); | |
BrokeredMessage message = new BrokeredMessage | |
{ | |
//ReplyQueue is separate from the | |
//RequestQueue | |
//This tells the server to send it here | |
ReplyTo = Manager.ReplyQueue, | |
ReplyToSessionId = ClientId, | |
MessageId = messageId | |
}; | |
Console.WriteLine("Session Id: " + ClientId); | |
Console.WriteLine("Sending message: " + messageId); | |
Console.WriteLine(); | |
message.Properties["data"] = "blah blah blah"; | |
requestClient.Send(message); | |
//checking for receipt | |
Console.WriteLine("Waiting for message"); | |
var result = session.Receive(new TimeSpan(0, 0, 15)); | |
return result; | |
}); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment