MSMQ using Rx - Code snippet for MSMQ receive timeout problem
using System;
using System.Messaging;
using System.Reactive.Concurrency;
using System.Reactive.Linq;
namespace MSMQPubSubRx
class Program
public static MessageQueue msmqQueue;
public static string queueMessageString = "Test String to queue in the MSMQ";
static void Main(string[] args)
msmqQueue = new MessageQueue("FormatName:DIRECT=OS:msmqserver\\private$\\myqueuename");
msmqQueue.Formatter = new XmlMessageFormatter(new Type[] { typeof(byte[]) });
// Part of code to run for queueing of messages in message queue.
//for (int i = 0; ; i++)
// msmqQueue.Send(Encoding.ASCII.GetBytes(queueMessageString));
// Part of code to run for dequeuing of message from message queue
/// <summary>
/// Start receiving client messages using Rx & Observable.
/// </summary>
static void StartQueueMessageReceivingProcess()
var incomingMessage = StartDequeueing()
//.TakeWhile(x => x != null)
.Subscribe(onNext: messageObject =>
if (messageObject != null)
onError: exceptionMessage =>
static IObservable<object> StartDequeueing()
return Observable.Create<object>(observer =>
var anonyFunction = Observable
(callbackObject, stateObject) => msmqQueue.BeginReceive(TimeSpan.FromMilliseconds(10), stateObject, callbackObject),
asyncResultActionObject => msmqQueue.EndReceive(asyncResultActionObject)
.Select(queueMessage =>
return queueMessage.Body;
return anonyFunction.Catch<object, Exception>(exception =>
return Observable.Empty<object>();
.Subscribe(observer.OnNext, observer.OnError, observer.OnCompleted);
.Catch<object, MessageQueueException>(exception => { return Observable.Empty<object>(); })
.Catch<object, Exception>(exceptionGeneric => { return Observable.Empty<object>(); });
static void QueueMessageRecievedHandler(object queueMessage)
Console.WriteLine("Message Received from Queue. Congrats !!");
static void QueueMessageReceivingExceptionOccurredHandler(Exception exceptionObject)
Console.WriteLine("Oops. Rx is not happy to play with MSMQ, please help Bartde !!");

Nice inplementation, especially around the error handling. Did you ever get this resolved? I think you just need an onCompletion to BeginReceive again so the Rx "listens" again. You may also want to consider BeginPeek instead, especially if using a transactional queue. I'm working on creating a similar project using Rx and MSMQ, if you've completed this code I would love to see an example of the finished product. Thanks!

