Created
August 9, 2016 22:27
-
-
Save GeorgeTsiokos/3cd921fc51b35c188f3b02addd93a143 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System; | |
using System.Reactive.Disposables; | |
using System.Text; | |
using System.Threading; | |
using Microsoft.ApplicationInsights; | |
using Microsoft.ApplicationInsights.DataContracts; | |
using RabbitMQ.Client; | |
using RabbitMQ.Client.Events; | |
namespace RabbitAppInsightsHelloWorld | |
{ | |
internal class Program | |
{ | |
private static void Main(string[] args) | |
{ | |
var telemetryClient = new TelemetryClient(); | |
var cancellationTokenSource = new CancellationTokenSource(); | |
var cancellationToken = cancellationTokenSource.Token; | |
Timer publisher; | |
EventingBasicConsumer subscriber; | |
IDisposable rabbit; | |
using (telemetryClient.StartOperation<RequestTelemetry>("Startup")) | |
{ | |
var factory = new ConnectionFactory {HostName = "localhost"}; | |
var connection = factory.CreateConnection(); | |
var channel = connection.CreateModel(); | |
QueueDeclare(channel, telemetryClient); | |
using (telemetryClient.StartOperation<RequestTelemetry>("PubisherStartup")) | |
{ | |
using (ExecutionContext.SuppressFlow()) | |
{ | |
publisher = CreatePublisher(channel, telemetryClient, cancellationToken); | |
} | |
} | |
using (telemetryClient.StartOperation<RequestTelemetry>("SubscriberStartup")) | |
{ | |
using (ExecutionContext.SuppressFlow()) | |
{ | |
subscriber = CreateSubscriber(channel, telemetryClient, cancellationToken); | |
} | |
} | |
rabbit = Disposable.Create(() => | |
{ | |
publisher.Dispose(); | |
subscriber.OnCancel(); | |
channel.Dispose(); | |
connection.Dispose(); | |
}); | |
} | |
using (telemetryClient.StartOperation<RequestTelemetry>("ConsoleLoop")) | |
{ | |
while (true) | |
{ | |
Console.WriteLine("Press <enter> to stop"); | |
var input = Console.ReadLine(); | |
if (string.IsNullOrWhiteSpace(input)) | |
{ | |
cancellationTokenSource.Cancel(); | |
break; | |
} | |
} | |
} | |
rabbit.Dispose(); | |
Console.WriteLine("Press <enter> to quit"); | |
Console.ReadLine(); | |
} | |
private static void QueueDeclare(IModel channel, TelemetryClient telemetryClient) | |
{ | |
using (telemetryClient.StartOperation<DependencyTelemetry>(nameof(IModel.QueueDeclare))) | |
{ | |
channel.QueueDeclare("hello", false, false, false, null); | |
} | |
} | |
private static Timer CreatePublisher(IModel channel, TelemetryClient telemetryClient, CancellationToken token) | |
{ | |
var message = "Hello World!"; | |
var body = Encoding.UTF8.GetBytes(message); | |
Timer timer = null; | |
TimerCallback timerCallback = _ => | |
{ | |
if (token.IsCancellationRequested) | |
{ | |
return; | |
} | |
using (var operation = telemetryClient.StartOperation<DependencyTelemetry>(nameof(IModel.BasicPublish))) | |
{ | |
channel.BasicPublish("", "hello", null, body); | |
Console.WriteLine($" [x] Sent {message}\t\tTelemetry Operation = {operation.Telemetry?.Context?.Operation?.Name}"); | |
} | |
timer.Change(2000, Timeout.Infinite); | |
}; | |
timer = new Timer(timerCallback); | |
timer.Change(1000, Timeout.Infinite); | |
return timer; | |
} | |
private static EventingBasicConsumer CreateSubscriber(IModel channel, TelemetryClient telemetryClient, CancellationToken token) | |
{ | |
var consumer = new EventingBasicConsumer(channel); | |
EventHandler<BasicDeliverEventArgs> eventHandler = (model, ea) => | |
{ | |
using (var operation = telemetryClient.StartOperation<DependencyTelemetry>(nameof(EventingBasicConsumer.Received))) | |
{ | |
var body = ea.Body; | |
var message = Encoding.UTF8.GetString(body); | |
Console.WriteLine($" [x] Received {message}\tTelemetry Operation = {operation.Telemetry?.Context?.Operation?.Name}"); | |
} | |
}; | |
consumer.Received += eventHandler; | |
token.Register(() => consumer.Received -= eventHandler); | |
channel.BasicConsume(queue: "hello", noAck: true, consumer: consumer); | |
return consumer; | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment