MSMQ using Rx - Code snippet for MSMQ receive timeout problem

  • Download Gist
MSMQ using Rx - Code snippet for MSMQ receive timeout problem.cs
C#
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88
using System;
using System.Messaging;
using System.Reactive.Concurrency;
using System.Reactive.Linq;
 
namespace MSMQPubSubRx
{
class Program
{
public static MessageQueue msmqQueue;
public static string queueMessageString = "Test String to queue in the MSMQ";
 
static void Main(string[] args)
{
msmqQueue = new MessageQueue("FormatName:DIRECT=OS:msmqserver\\private$\\myqueuename");
msmqQueue.Formatter = new XmlMessageFormatter(new Type[] { typeof(byte[]) });
 
// Part of code to run for queueing of messages in message queue.
//for (int i = 0; ; i++)
//{
// msmqQueue.Send(Encoding.ASCII.GetBytes(queueMessageString));
//}
 
// Part of code to run for dequeuing of message from message queue
StartQueueMessageReceivingProcess();
 
Console.ReadLine();
}
 
/// <summary>
/// Start receiving client messages using Rx & Observable.
/// </summary>
static void StartQueueMessageReceivingProcess()
{
var incomingMessage = StartDequeueing()
.Repeat()
//.TakeWhile(x => x != null)
.SubscribeOn(Scheduler.TaskPool)
.Subscribe(onNext: messageObject =>
{
if (messageObject != null)
{
QueueMessageRecievedHandler(messageObject);
}
},
onError: exceptionMessage =>
{
QueueMessageReceivingExceptionOccurredHandler(exceptionMessage);
});
}
 
static IObservable<object> StartDequeueing()
{
return Observable.Create<object>(observer =>
{
var anonyFunction = Observable
.FromAsyncPattern
(
(callbackObject, stateObject) => msmqQueue.BeginReceive(TimeSpan.FromMilliseconds(10), stateObject, callbackObject),
asyncResultActionObject => msmqQueue.EndReceive(asyncResultActionObject)
)()
.Select(queueMessage =>
{
return queueMessage.Body;
});
 
return anonyFunction.Catch<object, Exception>(exception =>
{
return Observable.Empty<object>();
})
.Subscribe(observer.OnNext, observer.OnError, observer.OnCompleted);
 
})
.Catch<object, MessageQueueException>(exception => { return Observable.Empty<object>(); })
.Catch<object, Exception>(exceptionGeneric => { return Observable.Empty<object>(); });
}
 
static void QueueMessageRecievedHandler(object queueMessage)
{
Console.WriteLine("Message Received from Queue. Congrats !!");
}
 
static void QueueMessageReceivingExceptionOccurredHandler(Exception exceptionObject)
{
Console.WriteLine("Oops. Rx is not happy to play with MSMQ, please help Bartde !!");
}
}
}

Please sign in to comment on this gist.

Something went wrong with that request. Please try again.