Skip to content

Instantly share code, notes, and snippets.

@GeorgeTsiokos
Created August 9, 2016 22:27
Show Gist options
  • Save GeorgeTsiokos/3cd921fc51b35c188f3b02addd93a143 to your computer and use it in GitHub Desktop.
Save GeorgeTsiokos/3cd921fc51b35c188f3b02addd93a143 to your computer and use it in GitHub Desktop.
using System;
using System.Reactive.Disposables;
using System.Text;
using System.Threading;
using Microsoft.ApplicationInsights;
using Microsoft.ApplicationInsights.DataContracts;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
namespace RabbitAppInsightsHelloWorld
{
internal class Program
{
private static void Main(string[] args)
{
var telemetryClient = new TelemetryClient();
var cancellationTokenSource = new CancellationTokenSource();
var cancellationToken = cancellationTokenSource.Token;
Timer publisher;
EventingBasicConsumer subscriber;
IDisposable rabbit;
using (telemetryClient.StartOperation<RequestTelemetry>("Startup"))
{
var factory = new ConnectionFactory {HostName = "localhost"};
var connection = factory.CreateConnection();
var channel = connection.CreateModel();
QueueDeclare(channel, telemetryClient);
using (telemetryClient.StartOperation<RequestTelemetry>("PubisherStartup"))
{
using (ExecutionContext.SuppressFlow())
{
publisher = CreatePublisher(channel, telemetryClient, cancellationToken);
}
}
using (telemetryClient.StartOperation<RequestTelemetry>("SubscriberStartup"))
{
using (ExecutionContext.SuppressFlow())
{
subscriber = CreateSubscriber(channel, telemetryClient, cancellationToken);
}
}
rabbit = Disposable.Create(() =>
{
publisher.Dispose();
subscriber.OnCancel();
channel.Dispose();
connection.Dispose();
});
}
using (telemetryClient.StartOperation<RequestTelemetry>("ConsoleLoop"))
{
while (true)
{
Console.WriteLine("Press <enter> to stop");
var input = Console.ReadLine();
if (string.IsNullOrWhiteSpace(input))
{
cancellationTokenSource.Cancel();
break;
}
}
}
rabbit.Dispose();
Console.WriteLine("Press <enter> to quit");
Console.ReadLine();
}
private static void QueueDeclare(IModel channel, TelemetryClient telemetryClient)
{
using (telemetryClient.StartOperation<DependencyTelemetry>(nameof(IModel.QueueDeclare)))
{
channel.QueueDeclare("hello", false, false, false, null);
}
}
private static Timer CreatePublisher(IModel channel, TelemetryClient telemetryClient, CancellationToken token)
{
var message = "Hello World!";
var body = Encoding.UTF8.GetBytes(message);
Timer timer = null;
TimerCallback timerCallback = _ =>
{
if (token.IsCancellationRequested)
{
return;
}
using (var operation = telemetryClient.StartOperation<DependencyTelemetry>(nameof(IModel.BasicPublish)))
{
channel.BasicPublish("", "hello", null, body);
Console.WriteLine($" [x] Sent {message}\t\tTelemetry Operation = {operation.Telemetry?.Context?.Operation?.Name}");
}
timer.Change(2000, Timeout.Infinite);
};
timer = new Timer(timerCallback);
timer.Change(1000, Timeout.Infinite);
return timer;
}
private static EventingBasicConsumer CreateSubscriber(IModel channel, TelemetryClient telemetryClient, CancellationToken token)
{
var consumer = new EventingBasicConsumer(channel);
EventHandler<BasicDeliverEventArgs> eventHandler = (model, ea) =>
{
using (var operation = telemetryClient.StartOperation<DependencyTelemetry>(nameof(EventingBasicConsumer.Received)))
{
var body = ea.Body;
var message = Encoding.UTF8.GetString(body);
Console.WriteLine($" [x] Received {message}\tTelemetry Operation = {operation.Telemetry?.Context?.Operation?.Name}");
}
};
consumer.Received += eventHandler;
token.Register(() => consumer.Received -= eventHandler);
channel.BasicConsume(queue: "hello", noAck: true, consumer: consumer);
return consumer;
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment