Skip to content

Embed URL

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
MSMQ using Rx - Code snippet for MSMQ receive timeout problem
using System;
using System.Messaging;
using System.Reactive.Concurrency;
using System.Reactive.Linq;
namespace MSMQPubSubRx
{
class Program
{
public static MessageQueue msmqQueue;
public static string queueMessageString = "Test String to queue in the MSMQ";
static void Main(string[] args)
{
msmqQueue = new MessageQueue("FormatName:DIRECT=OS:msmqserver\\private$\\myqueuename");
msmqQueue.Formatter = new XmlMessageFormatter(new Type[] { typeof(byte[]) });
// Part of code to run for queueing of messages in message queue.
//for (int i = 0; ; i++)
//{
// msmqQueue.Send(Encoding.ASCII.GetBytes(queueMessageString));
//}
// Part of code to run for dequeuing of message from message queue
StartQueueMessageReceivingProcess();
Console.ReadLine();
}
/// <summary>
/// Start receiving client messages using Rx & Observable.
/// </summary>
static void StartQueueMessageReceivingProcess()
{
var incomingMessage = StartDequeueing()
.Repeat()
//.TakeWhile(x => x != null)
.SubscribeOn(Scheduler.TaskPool)
.Subscribe(onNext: messageObject =>
{
if (messageObject != null)
{
QueueMessageRecievedHandler(messageObject);
}
},
onError: exceptionMessage =>
{
QueueMessageReceivingExceptionOccurredHandler(exceptionMessage);
});
}
static IObservable<object> StartDequeueing()
{
return Observable.Create<object>(observer =>
{
var anonyFunction = Observable
.FromAsyncPattern
(
(callbackObject, stateObject) => msmqQueue.BeginReceive(TimeSpan.FromMilliseconds(10), stateObject, callbackObject),
asyncResultActionObject => msmqQueue.EndReceive(asyncResultActionObject)
)()
.Select(queueMessage =>
{
return queueMessage.Body;
});
return anonyFunction.Catch<object, Exception>(exception =>
{
return Observable.Empty<object>();
})
.Subscribe(observer.OnNext, observer.OnError, observer.OnCompleted);
})
.Catch<object, MessageQueueException>(exception => { return Observable.Empty<object>(); })
.Catch<object, Exception>(exceptionGeneric => { return Observable.Empty<object>(); });
}
static void QueueMessageRecievedHandler(object queueMessage)
{
Console.WriteLine("Message Received from Queue. Congrats !!");
}
static void QueueMessageReceivingExceptionOccurredHandler(Exception exceptionObject)
{
Console.WriteLine("Oops. Rx is not happy to play with MSMQ, please help Bartde !!");
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Something went wrong with that request. Please try again.