Skip to content

Instantly share code, notes, and snippets.

@xinmyname
Last active April 12, 2018 13:33
Show Gist options
  • Save xinmyname/d647a4e324ac43921803b9539238edc7 to your computer and use it in GitHub Desktop.
Save xinmyname/d647a4e324ac43921803b9539238edc7 to your computer and use it in GitHub Desktop.
MessageQueue extensions for C# that are generic, thread-safe, cancellable and can be timed out.
using System;
using System.Linq;
using System.Messaging;
using System.Threading;
using System.Threading.Tasks;
namespace MessageCore
{
public static class MessageQueueFactory
{
public static MessageQueue OpenLocal(string name)
{
MessageQueue queue = MessageQueue.GetPrivateQueuesByMachine(Environment.MachineName).SingleOrDefault(q => q.QueueName == name);
if (queue != null)
queue.Formatter = new BinaryMessageFormatter();
return queue;
}
public static void SendMessage(this MessageQueue queue, object body)
{
var message = new Message(body, queue.Formatter);
lock (queue)
{
queue.Send(message, MessageQueueTransactionType.Single);
}
}
public static Task<T> ReceiveMessage<T>(this MessageQueue queue, int millisTimeout, CancellationToken cancellationToken) where T: class
{
return Task.Run(() =>
{
IAsyncResult asyncResult = queue.BeginPeek();
var waitHandles = new[] { cancellationToken.WaitHandle, asyncResult.AsyncWaitHandle };
int waitIndex = WaitHandle.WaitAny(waitHandles, millisTimeout);
switch (waitIndex)
{
case WaitHandle.WaitTimeout:
return Task.FromResult<T>(null);
case 0:
return Task.FromCanceled<T>(cancellationToken);
default:
{
queue.EndPeek(asyncResult);
Message message;
lock (queue)
{
message = queue.Receive(MessageQueueTransactionType.Single);
}
T result = typeof(T) == typeof(Message)
? message as T
: message?.Body as T;
return Task.FromResult(result);
}
}
}, cancellationToken);
}
public static Task<T> ReceiveMessage<T>(this MessageQueue queue, CancellationToken cancellationToken) where T: class
{
return ReceiveMessage<T>(queue, Timeout.Infinite, cancellationToken);
}
public static Task<T> ReceiveMessage<T>(this MessageQueue queue) where T: class
{
return ReceiveMessage<T>(queue, Timeout.Infinite, CancellationToken.None);
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment