Skip to content

Instantly share code, notes, and snippets.

@leachdaniel
Created May 13, 2018 19:56
Show Gist options
  • Save leachdaniel/c308eada8b3bd7c42e577a2bfba68b09 to your computer and use it in GitHub Desktop.
Save leachdaniel/c308eada8b3bd7c42e577a2bfba68b09 to your computer and use it in GitHub Desktop.
Determine if you can copy from the default exchange.
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
namespace RabbitMQPOCs
{
class Program
{
static void Main(string[] args)
{
// need to start a docker container and change the IP to the container IP below
// e.g. docker run -d -p 15672:15672 -p 5672:5672 -p 5671:5671 rabbitmq:3-management
var factory = new ConnectionFactory() { HostName = "172.19.109.180" };
using (var connection = factory.CreateConnection())
{
using (var consumerChannel = connection.CreateModel())
{
ConsumeMessages(consumerChannel);
CopyToExchange(connection);
PublishMessages(connection);
}
}
}
private static void PublishMessages(IConnection connection)
{
using (var channel = connection.CreateModel())
{
string message = "Hello World";
do
{
Console.WriteLine(" Enter a message or nothing to stop.");
message = Console.ReadLine();
PublishMessage(message, channel);
} while (!string.IsNullOrWhiteSpace(message));
}
}
private static void CopyToExchange(IConnection connection)
{
// lets see if we can copy messages from the default exchange to a new exchange
// theory is you can subscribe to the default exchange
/*
And you cannot:
RabbitMQ.Client.Exceptions.OperationInterruptedException:
'The AMQP operation was interrupted: AMQP close-reason, initiated by Peer, code=403,
text="ACCESS_REFUSED - operation not permitted on the default exchange",
classId=40, methodId=30, cause='
*/
using (var channel = connection.CreateModel())
{
channel.ExchangeDeclare("copyofdefault", "fanout", false, true);
channel.ExchangeBind("copyofdefault", "", "task_queue");
var qd = channel.QueueDeclare(exclusive: true);
channel.QueueBind(qd.QueueName, "copyofdefault", "");
}
}
private static void ConsumeMessages(IModel channel)
{
channel.QueueDeclare(queue: "task_queue",
durable: true,
exclusive: false,
autoDelete: false,
arguments: null);
channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);
Console.WriteLine(" [*] Waiting for messages.");
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
var body = ea.Body;
var message = Encoding.UTF8.GetString(body);
Console.WriteLine(" [x] Received {0}", message);
int dots = message.Split('.').Length - 1;
Thread.Sleep(dots * 1000);
Console.WriteLine(" [x] Done");
channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
};
channel.BasicConsume(queue: "task_queue",
autoAck: false,
consumer: consumer);
}
private static void PublishMessage(string message, IModel channel)
{
var body = Encoding.UTF8.GetBytes(message);
var properties = channel.CreateBasicProperties();
properties.Persistent = true;
channel.BasicPublish(exchange: "",
routingKey: "task_queue",
basicProperties: properties,
body: body);
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment