Created
May 13, 2018 19:56
-
-
Save leachdaniel/c308eada8b3bd7c42e577a2bfba68b09 to your computer and use it in GitHub Desktop.
Determine if you can copy from the default exchange.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using RabbitMQ.Client; | |
using RabbitMQ.Client.Events; | |
using System; | |
using System.Text; | |
using System.Threading; | |
using System.Threading.Tasks; | |
namespace RabbitMQPOCs | |
{ | |
class Program | |
{ | |
static void Main(string[] args) | |
{ | |
// need to start a docker container and change the IP to the container IP below | |
// e.g. docker run -d -p 15672:15672 -p 5672:5672 -p 5671:5671 rabbitmq:3-management | |
var factory = new ConnectionFactory() { HostName = "172.19.109.180" }; | |
using (var connection = factory.CreateConnection()) | |
{ | |
using (var consumerChannel = connection.CreateModel()) | |
{ | |
ConsumeMessages(consumerChannel); | |
CopyToExchange(connection); | |
PublishMessages(connection); | |
} | |
} | |
} | |
private static void PublishMessages(IConnection connection) | |
{ | |
using (var channel = connection.CreateModel()) | |
{ | |
string message = "Hello World"; | |
do | |
{ | |
Console.WriteLine(" Enter a message or nothing to stop."); | |
message = Console.ReadLine(); | |
PublishMessage(message, channel); | |
} while (!string.IsNullOrWhiteSpace(message)); | |
} | |
} | |
private static void CopyToExchange(IConnection connection) | |
{ | |
// lets see if we can copy messages from the default exchange to a new exchange | |
// theory is you can subscribe to the default exchange | |
/* | |
And you cannot: | |
RabbitMQ.Client.Exceptions.OperationInterruptedException: | |
'The AMQP operation was interrupted: AMQP close-reason, initiated by Peer, code=403, | |
text="ACCESS_REFUSED - operation not permitted on the default exchange", | |
classId=40, methodId=30, cause=' | |
*/ | |
using (var channel = connection.CreateModel()) | |
{ | |
channel.ExchangeDeclare("copyofdefault", "fanout", false, true); | |
channel.ExchangeBind("copyofdefault", "", "task_queue"); | |
var qd = channel.QueueDeclare(exclusive: true); | |
channel.QueueBind(qd.QueueName, "copyofdefault", ""); | |
} | |
} | |
private static void ConsumeMessages(IModel channel) | |
{ | |
channel.QueueDeclare(queue: "task_queue", | |
durable: true, | |
exclusive: false, | |
autoDelete: false, | |
arguments: null); | |
channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false); | |
Console.WriteLine(" [*] Waiting for messages."); | |
var consumer = new EventingBasicConsumer(channel); | |
consumer.Received += (model, ea) => | |
{ | |
var body = ea.Body; | |
var message = Encoding.UTF8.GetString(body); | |
Console.WriteLine(" [x] Received {0}", message); | |
int dots = message.Split('.').Length - 1; | |
Thread.Sleep(dots * 1000); | |
Console.WriteLine(" [x] Done"); | |
channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false); | |
}; | |
channel.BasicConsume(queue: "task_queue", | |
autoAck: false, | |
consumer: consumer); | |
} | |
private static void PublishMessage(string message, IModel channel) | |
{ | |
var body = Encoding.UTF8.GetBytes(message); | |
var properties = channel.CreateBasicProperties(); | |
properties.Persistent = true; | |
channel.BasicPublish(exchange: "", | |
routingKey: "task_queue", | |
basicProperties: properties, | |
body: body); | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment