Skip to content

Instantly share code, notes, and snippets.

@sitereactor
Last active August 29, 2015 13:56
Show Gist options
  • Save sitereactor/8953583 to your computer and use it in GitHub Desktop.
Save sitereactor/8953583 to your computer and use it in GitHub Desktop.
Code sample for "Continuously receive messages async from Azure Service Bus Queues" posted here http://codereview.stackexchange.com/questions/41462/continuously-receive-messages-async-from-azure-service-bus-queues
using System;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;
using Concorde.AzurePack.Domain.Websites.Preallocation.Settings;
using Concorde.Contracts.Processes;
using Concorde.Infrastructure.Messaging;
using Microsoft.ServiceBus.Messaging;
namespace Allocation
{
public class Processor : IProcessor, IDisposable
{
private bool _disposed;
private CancellationTokenSource _cancellationSource;
private readonly object _lockObject = new object();
private readonly AllocationSettings _allocationSettings;
private readonly ProjectSettings _projectSettings;
private readonly QueueClient _queueClient;
public Processor(AllocationSettings allocationSettings,
ProjectSettings projectSettings,
QueueClient queueClient)
{
_allocationSettings = allocationSettings;
_projectSettings = projectSettings;
_queueClient = queueClient;
}
public void Start()
{
ThrowIfDisposed();
lock (_lockObject)
{
Trace.WriteLine(string.Format("Started '{0}' allocation.", _projectSettings.Name));
_cancellationSource = new CancellationTokenSource();
Task.Factory.StartNew(ReceiveNextMessage, _cancellationSource.Token);
}
}
public void Stop()
{
lock (_lockObject)
{
using (_cancellationSource)
{
if (_cancellationSource != null)
{
_cancellationSource.Cancel();
_cancellationSource = null;
}
}
Trace.WriteLine(string.Format("Stopped '{0}' allocation.", _projectSettings.Name));
}
}
/// <summary>
/// Disposes the resources used by the processor.
/// </summary>
public void Dispose()
{
Dispose(true);
GC.SuppressFinalize(this);
}
/// <summary>
/// Disposes the resources used by the processor.
/// </summary>
protected virtual void Dispose(bool disposing)
{
if (!_disposed)
{
if (disposing)
{
Stop();
_disposed = true;
//Dispose other stuff
}
}
}
~ProjectAllocationProcessor()
{
Dispose(false);
}
private void ReceiveMessage()
{
Task.Run(async () =>
{
//Receive a new message async within 1 minute
return await _queueClient.ReceiveAsync(TimeSpan.FromMinutes(1));
})
.ContinueWith(t =>
{
if (t.Result != null)
{
//The result is not null so we process the Brokered Message
Task.Factory.StartNew(() =>
{
ProcessMessage(t.Result)
ReceiveNextMessage();
},
_cancellationSource.Token);
}
else
{
// Continue receiving and processing new messages until told to stop.
ReceiveNextMessage();
}
});
}
private void ReceiveNextMessage()
{
if (_cancellationSource.IsCancellationRequested == false)
{
//Continue to receive new messages every 1 minute
Task.Delay(TimeSpan.FromMinutes(1))
.ContinueWith(t => ReceiveMessage());
}
}
private void ProcessMessage(BrokeredMessage result)
{
var releaseAction = MessageReleaseAction.AbandonMessage;
try
{
// Make sure the process was told to stop receiving while it was waiting for a new message.
if (_cancellationSource.IsCancellationRequested == false)
{
// Process the received message.
// Update the 'release action'
}
}
finally
{
//ReleaseMessage
ReleaseMessage(result, releaseAction);
}
}
private void ReleaseMessage(BrokeredMessage msg, MessageReleaseAction releaseAction)
{
//Release message according to release actions:
//MessageReleaseActionKind.Complete
//MessageReleaseActionKind.Abandon
//MessageReleaseActionKind.DeadLetter
}
private void ThrowIfDisposed()
{
if (_disposed)
throw new ObjectDisposedException("Processor");
}
}
}
@lars-erik
Copy link

I'd refactor more:

    private void ReceiveMessage()
    {
        Task.Run(async () => await ReceiveAsync())
            .ContinueWith(ProcessResult);
    }

    private async Task<BrokeredMessage> ReceiveAsync()
    {
        return await _queueClient.ReceiveAsync(TimeSpan.FromMinutes(1));
    }

    private void ProcessResult(Task<BrokeredMessage> t)
    {
        if (t.Result != null)
        {
            //The result is not null so we process the Brokered Message
            Task.Factory.StartNew(() => ProcessMessage(t.Result), _cancellationSource.Token);
        }
        else
        {
            // Continue receiving and processing new messages until told to stop.
            ReceiveNextMessage();
        }
    }

    private void ReceiveNextMessage()
    {
        if (_cancellationSource.IsCancellationRequested == false)
        {
            //Continue to receive new messages every 1 minute
            Task.Delay(TimeSpan.FromMinutes(1))
                .ContinueWith(t => ReceiveMessage());
        }
    }

@agehrke
Copy link

agehrke commented Feb 17, 2014

I don't really get the usage of StartNew() and ContinueWith. Shouldn't something like this ReceiveAndProcessMessagesAsync method accomplish the same?

@sitereactor
Copy link
Author

@agehrke Yes, it should. Didn't consider using multiple awaits, but really like your suggestion. Much more elegant :)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment