Last active
April 12, 2018 13:33
-
-
Save xinmyname/d647a4e324ac43921803b9539238edc7 to your computer and use it in GitHub Desktop.
MessageQueue extensions for C# that are generic, thread-safe, cancellable and can be timed out.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System; | |
using System.Linq; | |
using System.Messaging; | |
using System.Threading; | |
using System.Threading.Tasks; | |
namespace MessageCore | |
{ | |
public static class MessageQueueFactory | |
{ | |
public static MessageQueue OpenLocal(string name) | |
{ | |
MessageQueue queue = MessageQueue.GetPrivateQueuesByMachine(Environment.MachineName).SingleOrDefault(q => q.QueueName == name); | |
if (queue != null) | |
queue.Formatter = new BinaryMessageFormatter(); | |
return queue; | |
} | |
public static void SendMessage(this MessageQueue queue, object body) | |
{ | |
var message = new Message(body, queue.Formatter); | |
lock (queue) | |
{ | |
queue.Send(message, MessageQueueTransactionType.Single); | |
} | |
} | |
public static Task<T> ReceiveMessage<T>(this MessageQueue queue, int millisTimeout, CancellationToken cancellationToken) where T: class | |
{ | |
return Task.Run(() => | |
{ | |
IAsyncResult asyncResult = queue.BeginPeek(); | |
var waitHandles = new[] { cancellationToken.WaitHandle, asyncResult.AsyncWaitHandle }; | |
int waitIndex = WaitHandle.WaitAny(waitHandles, millisTimeout); | |
switch (waitIndex) | |
{ | |
case WaitHandle.WaitTimeout: | |
return Task.FromResult<T>(null); | |
case 0: | |
return Task.FromCanceled<T>(cancellationToken); | |
default: | |
{ | |
queue.EndPeek(asyncResult); | |
Message message; | |
lock (queue) | |
{ | |
message = queue.Receive(MessageQueueTransactionType.Single); | |
} | |
T result = typeof(T) == typeof(Message) | |
? message as T | |
: message?.Body as T; | |
return Task.FromResult(result); | |
} | |
} | |
}, cancellationToken); | |
} | |
public static Task<T> ReceiveMessage<T>(this MessageQueue queue, CancellationToken cancellationToken) where T: class | |
{ | |
return ReceiveMessage<T>(queue, Timeout.Infinite, cancellationToken); | |
} | |
public static Task<T> ReceiveMessage<T>(this MessageQueue queue) where T: class | |
{ | |
return ReceiveMessage<T>(queue, Timeout.Infinite, CancellationToken.None); | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment