Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
Sample code using the Reactive Extensions + Azure Service Bus Queue
internal class Program
{
private const string QueueName = "colasbus";
private static readonly string ConnectionString = ConfigurationManager.AppSettings.Get(0);
private static QueueClient _queueClient;
private static void Main()
{
Console.ForegroundColor = ConsoleColor.DarkRed;
Console.WriteLine("Creando el cliente...");
CrearCliente();
Console.WriteLine("Mensajes");
Console.WriteLine(">>Press any key to cancel<<");
Console.WriteLine("---------------------------");
IObservable<BrokeredMessage> observable = Observable.Create<BrokeredMessage>(
observer =>
{
_queueClient.OnMessage(observer.OnNext, new OnMessageOptions());
return Disposable.Empty;
}).Publish().RefCount();
var comandos = observable.Where(
message =>
{
object obj;
return message.Properties.TryGetValue("IsCommand", out obj) && (bool)obj;
}).Subscribe(
x => //OnNext
{
Console.ForegroundColor = ConsoleColor.DarkYellow;
Console.WriteLine(x.GetBody<string>());
},
x => Console.WriteLine(x.Message), //OnError
() => Console.WriteLine("Complete")); //OnComplete
var queries = observable.Where(
message =>
{
object obj;
return message.Properties.TryGetValue("IsQuerie", out obj) && (bool)obj;
}).Subscribe(
x => //OnNext
{
Console.ForegroundColor = ConsoleColor.DarkRed;
Console.WriteLine(x.GetBody<string>());
},
x => Console.WriteLine(x.Message), //OnError
() => Console.WriteLine("Complete")); //OnComplete
Console.ReadKey();
comandos.Dispose();
queries.Dispose();
Console.WriteLine("Press any key to quit");
Console.ReadKey();
}
private static void CrearCliente()
{
_queueClient = QueueClient.CreateFromConnectionString(ConnectionString, QueueName);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.