Skip to content
Create a gist now

Instantly share code, notes, and snippets.

MSMQ using Rx - Code snippet for MSMQ receive timeout problem
using System;
using System.Messaging;
using System.Reactive.Concurrency;
using System.Reactive.Linq;
namespace MSMQPubSubRx
class Program
public static MessageQueue msmqQueue;
public static string queueMessageString = "Test String to queue in the MSMQ";
static void Main(string[] args)
msmqQueue = new MessageQueue("FormatName:DIRECT=OS:msmqserver\\private$\\myqueuename");
msmqQueue.Formatter = new XmlMessageFormatter(new Type[] { typeof(byte[]) });
// Part of code to run for queueing of messages in message queue.
//for (int i = 0; ; i++)
// msmqQueue.Send(Encoding.ASCII.GetBytes(queueMessageString));
// Part of code to run for dequeuing of message from message queue
/// <summary>
/// Start receiving client messages using Rx & Observable.
/// </summary>
static void StartQueueMessageReceivingProcess()
var incomingMessage = StartDequeueing()
//.TakeWhile(x => x != null)
.Subscribe(onNext: messageObject =>
if (messageObject != null)
onError: exceptionMessage =>
static IObservable<object> StartDequeueing()
return Observable.Create<object>(observer =>
var anonyFunction = Observable
(callbackObject, stateObject) => msmqQueue.BeginReceive(TimeSpan.FromMilliseconds(10), stateObject, callbackObject),
asyncResultActionObject => msmqQueue.EndReceive(asyncResultActionObject)
.Select(queueMessage =>
return queueMessage.Body;
return anonyFunction.Catch<object, Exception>(exception =>
return Observable.Empty<object>();
.Subscribe(observer.OnNext, observer.OnError, observer.OnCompleted);
.Catch<object, MessageQueueException>(exception => { return Observable.Empty<object>(); })
.Catch<object, Exception>(exceptionGeneric => { return Observable.Empty<object>(); });
static void QueueMessageRecievedHandler(object queueMessage)
Console.WriteLine("Message Received from Queue. Congrats !!");
static void QueueMessageReceivingExceptionOccurredHandler(Exception exceptionObject)
Console.WriteLine("Oops. Rx is not happy to play with MSMQ, please help Bartde !!");

Nice inplementation, especially around the error handling. Did you ever get this resolved? I think you just need an onCompletion to BeginReceive again so the Rx "listens" again. You may also want to consider BeginPeek instead, especially if using a transactional queue. I'm working on creating a similar project using Rx and MSMQ, if you've completed this code I would love to see an example of the finished product. Thanks!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Something went wrong with that request. Please try again.