Skip to content

Instantly share code, notes, and snippets.

@kellypleahy
Created March 24, 2011 07:05
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save kellypleahy/884685 to your computer and use it in GitHub Desktop.
Save kellypleahy/884685 to your computer and use it in GitHub Desktop.
an attempt at a queue watcher that is more correct for our needs.
public interface IQueueWatcher
{
void Start();
}
public interface IQueueMessageProcessor
{
void ProcessMessage(byte[] messageBytes);
}
public class JobStatusMessageProcessor: IQueueMessageProcessor
{
public ILogger Log { get; set; }
public JobStatusMessageProcessor()
{
Log = new NullLogger();
}
public void ProcessMessage(byte[] messageBytes)
{
Log.Debug("Processing message of size: {0}", messageBytes);
}
}
public class JobStatusQueueWatcher: QueueWatcherBase<JobStatusMessageProcessor>
{
public JobStatusQueueWatcher(JobStatusMessageProcessor processor, ITimerFacade timerFacade, IQueueService queueService)
: base(processor, timerFacade, queueService)
{
}
public override string QueueName
{
get { return "w-queue-test"; }
}
public override TimeSpan SleepInterval
{
get { return TimeSpan.FromMilliseconds(500); }
}
public override TimeSpan VisibilityTimeout
{
get { return TimeSpan.FromMinutes(1); }
}
}
public abstract class QueueWatcherBase<TProcessor>: IQueueWatcher
where TProcessor : IQueueMessageProcessor
{
private readonly TimeSpan _infiniteTimeout;
public ILogger Log { get; set; }
private readonly TProcessor _processor;
private readonly ITimerFacade _timerFacade;
private readonly IQueueService _queueService;
public abstract string QueueName { get; }
public abstract TimeSpan SleepInterval { get; }
public abstract TimeSpan VisibilityTimeout { get; }
protected QueueWatcherBase(TProcessor processor, ITimerFacade timerFacade, IQueueService queueService)
{
Log = new NullLogger();
_infiniteTimeout = TimeSpan.FromMilliseconds(-1);
_processor = processor;
_queueService = queueService;
_timerFacade = timerFacade;
}
void IQueueWatcher.Start()
{
Log.Debug("QueueWatcher started for processor type: " + typeof(TProcessor).Name);
_timerFacade.Start(ProcessMessages, _infiniteTimeout);
}
private void ProcessMessages()
{
var message = _queueService.GetMessage(QueueName, VisibilityTimeout);
while (message != null)
{
try
{
_processor.ProcessMessage(message.MessageBody);
}
catch (Exception ex)
{
Log.Debug("Error processing message {0} from queue {1}, moving to dead message queue {1}-errors\n --- exception was: {2}", message.MessageId, QueueName, ex);
// move the message to a dead-letter queue with the name of the original queue + "-errors".
}
_queueService.DeleteMessage(message.MessageId, message.PopId);
message = _queueService.GetMessage(QueueName, VisibilityTimeout);
}
Log.Debug("Sleeping for {0}.", SleepInterval);
_timerFacade.Change(SleepInterval, _infiniteTimeout);
}
}
[TestFixture]
public class QueueWatcherBaseTestContext
{
private Action _saveAction;
private MockRepository _mockRepository;
private IProcessor _processor;
private ITimerFacade _timerFacade;
private IQueueService _queueService;
private TimeSpan _infiniteTimeout;
private QueueWatcherTest _queueWatcherTest;
private interface IProcessor: IQueueMessageProcessor
{
}
private class QueueWatcherTest: QueueWatcherBase<IProcessor>
{
public QueueWatcherTest(IProcessor processor, ITimerFacade timerFacade, IQueueService queueService)
: base(processor, timerFacade, queueService)
{
}
public override string QueueName
{
get { return "queue"; }
}
public override TimeSpan SleepInterval
{
get { return TimeSpan.FromMinutes(3); }
}
public override TimeSpan VisibilityTimeout
{
get { return TimeSpan.FromMinutes(15); }
}
}
[SetUp]
public void SetUp()
{
_mockRepository = new MockRepository();
_processor = _mockRepository.StrictMock<IProcessor>();
_timerFacade = _mockRepository.StrictMock<ITimerFacade>();
_queueService = _mockRepository.StrictMock<IQueueService>();
_infiniteTimeout = TimeSpan.FromMilliseconds(-1);
_saveAction = null;
_queueWatcherTest = new QueueWatcherTest(_processor, _timerFacade, _queueService);
IQueueWatcher queueWatcher = _queueWatcherTest;
using (_mockRepository.Record())
{
_timerFacade.Expect(x => x.Start(Arg<Action>.Is.Anything, Arg<TimeSpan>.Is.Equal(_infiniteTimeout)))
.WhenCalled(x => _saveAction = (Action)x.Arguments[0]);
}
using (_mockRepository.Playback())
{
queueWatcher.Start();
}
}
[Test]
public void it_schedules_the_action()
{
Assert.That(_saveAction, Is.Not.Null);
}
[Test]
public void the_action_calls_the_processor_for_as_many_queue_messages_are_available_and_then_reschedules_itself()
{
var message1 = new QueueMessage
{
MessageBody = new byte[] {0},
MessageId = "1",
PopId = "p1",
};
var message2 = new QueueMessage
{
MessageBody = new byte[] {1},
MessageId = "2",
PopId = "p2",
};
_mockRepository.BackToRecordAll();
using (_mockRepository.Record())
{
_queueService.Expect(x => x.GetMessage(_queueWatcherTest.QueueName, _queueWatcherTest.VisibilityTimeout))
.Return(message1);
_processor.ProcessMessage(message1.MessageBody);
_queueService.DeleteMessage(message1.MessageId, message1.PopId);
_queueService.Expect(x => x.GetMessage(_queueWatcherTest.QueueName, _queueWatcherTest.VisibilityTimeout))
.Return(message2);
_processor.ProcessMessage(message2.MessageBody);
_queueService.DeleteMessage(message2.MessageId, message2.PopId);
_queueService.Expect(x => x.GetMessage(_queueWatcherTest.QueueName, _queueWatcherTest.VisibilityTimeout))
.Return(null);
_timerFacade.Change(_queueWatcherTest.SleepInterval, _infiniteTimeout);
}
using(_mockRepository.Playback())
{
_saveAction();
}
}
[Test]
public void the_action_still_deletes_the_message_and_reschedules_itself_even_if_the_processor_throws_an_exception()
{
var message1 = new QueueMessage
{
MessageBody = new byte[] { 0 },
MessageId = "1",
PopId = "p1",
};
_mockRepository.BackToRecordAll();
using (_mockRepository.Record())
{
_queueService.Expect(x => x.GetMessage(_queueWatcherTest.QueueName, _queueWatcherTest.VisibilityTimeout))
.Return(message1);
_processor.Expect(x => x.ProcessMessage(message1.MessageBody)).WhenCalled(x => { throw new InvalidOperationException(); });
_queueService.DeleteMessage(message1.MessageId, message1.PopId);
_queueService.Expect(x => x.GetMessage(_queueWatcherTest.QueueName, _queueWatcherTest.VisibilityTimeout))
.Return(null);
_timerFacade.Change(_queueWatcherTest.SleepInterval, _infiniteTimeout);
}
using (_mockRepository.Playback())
{
_saveAction();
}
}
}
public class QueueMessage
{
public string MessageId;
public string PopId;
public byte[] MessageBody;
}
public interface IQueueService
{
QueueMessage GetMessage(string queueName, TimeSpan visibilityTimeout);
void DeleteMessage(string messageId, string popId);
}
public class FakeQueueService : IQueueService
{
private bool _dequeued = false;
public QueueMessage GetMessage(string queueName, TimeSpan visibilityTimeout)
{
if (DateTime.Now.Second % 5 == 0)
{
if (_dequeued)
return null;
_dequeued = true;
return new QueueMessage { MessageId = Guid.NewGuid().ToString(), PopId = "blah" };
}
_dequeued = false;
return null;
}
public void DeleteMessage(string messageId, string popId)
{
}
}
public interface ITimerFacade
{
void Start(Action action, TimeSpan timeout);
void Change(TimeSpan interval, TimeSpan timeout);
void Stop();
}
public class TimerFacade : ITimerFacade, IDisposable
{
private readonly Timer _timer;
private Action _action;
public TimerFacade()
{
_timer = new Timer(obj => _action());
}
public void Start(Action periodicAction, TimeSpan timeout)
{
if (periodicAction == null)
throw new ArgumentNullException("periodicAction");
if (_action != null)
throw new InvalidOperationException("timer already started");
_action = periodicAction;
_timer.Change(TimeSpan.Zero, timeout);
}
public void Change(TimeSpan dueTime, TimeSpan timeout)
{
if (_action == null)
throw new InvalidOperationException("timer not yet started");
_timer.Change(dueTime, timeout);
}
public void Stop()
{
if (_action == null)
throw new InvalidOperationException("timer not yet started");
_action = null;
_timer.Change(-1, -1);
}
public void Dispose()
{
_timer.Dispose();
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment