Skip to content

Instantly share code, notes, and snippets.

@clemensv
Created May 10, 2012 19:37
Show Gist options
  • Save clemensv/2655348 to your computer and use it in GitHub Desktop.
Save clemensv/2655348 to your computer and use it in GitHub Desktop.
Message Correlation - Web Role Side
namespace CorrelationSite.Controllers
{
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Web.Mvc;
using CorrelationSite.Models;
using Microsoft.ServiceBus.Messaging;
public class ProcessingJobsController : Controller
{
static readonly Dictionary<string, Action<BrokeredMessage>> pendingThreads = new Dictionary<string, Action<BrokeredMessage>>();
static readonly object receiveLoopMutex = new object();
static bool receiveLoopRunning;
readonly CorrelationSiteContext context = new CorrelationSiteContext();
// --- start of message correlation section
public ViewResult MessageCorrelation()
{
return this.View();
}
[HttpPost]
public ActionResult MessageCorrelation(ProcessingJob job)
{
this.EnsureMessageCorrelationReceiveLoop();
var mvcApplication = ((MvcApplication) this.HttpContext.ApplicationInstance);
job = this.context.ProcessingJobs.Add(job);
this.context.SaveChanges();
var queueClient = mvcApplication.MessagingFactory.CreateQueueClient(mvcApplication.MessageCorrelationQueueName, ReceiveMode.ReceiveAndDelete);
var msg = new BrokeredMessage()
{
MessageId = Guid.NewGuid().ToString(),
ReplyTo = mvcApplication.MessageCorrelationReplyQueueName,
Properties =
{
{"JobId", job.JobId},
{"Operation", job.Operation},
{"FirstArgument", job.FirstArgument},
{"SecondArgument", job.SecondArgument}
}
};
double result = 0;
var mre = new ManualResetEvent(false);
pendingThreads.Add
(msg.MessageId,
(bm) =>
{
result = (double) bm.Properties["Result"];
mre.Set();
});
queueClient.Send(msg);
if (!mre.WaitOne(TimeSpan.FromSeconds(1)))
{
// no reply after 5 secs? go to the index page
pendingThreads.Remove(msg.MessageId);
return this.RedirectToAction("Index");
}
else
{
job.Result = result;
this.context.SaveChanges();
return this.RedirectToAction("Details", new {id = job.JobId});
}
}
void EnsureMessageCorrelationReceiveLoop()
{
lock (receiveLoopMutex)
{
if (!receiveLoopRunning)
{
var mvcApplication = ((MvcApplication) this.HttpContext.ApplicationInstance);
var queueClient = mvcApplication.MessagingFactory.CreateQueueClient
(mvcApplication.MessageCorrelationReplyQueueName, ReceiveMode.ReceiveAndDelete);
queueClient.BeginReceive(this.MessageCorrelationReceiveLoop, queueClient);
receiveLoopRunning = true;
}
}
}
void MessageCorrelationReceiveLoop(IAsyncResult ar)
{
var queueClient = ar.AsyncState as QueueClient;
try
{
var reply = queueClient.EndReceive(ar);
if (reply != null)
{
Action<BrokeredMessage> callback;
if (pendingThreads.TryGetValue(reply.CorrelationId, out callback))
{
pendingThreads.Remove(reply.CorrelationId);
callback(reply);
}
else
{
this.HandleOrphanJob(reply);
}
}
}
catch (Exception)
{
// log the exception
}
queueClient.BeginReceive(this.MessageCorrelationReceiveLoop, queueClient);
}
void HandleOrphanJob(BrokeredMessage reply)
{
var localcontext = new CorrelationSiteContext();
var jobId = (int) reply.Properties["JobId"];
var processingJob = localcontext.ProcessingJobs.Single(x => x.JobId == jobId);
if (processingJob != null)
{
processingJob.Result = (double) reply.Properties["Result"];
localcontext.SaveChanges();
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment