Skip to content

Instantly share code, notes, and snippets.

@acid24 acid24/publisher
Last active Jun 24, 2018

Embed
What would you like to do?
RabbitMQ publish confirms mandatory
<?php
use PhpAmqpLib\Message\AMQPMessage;
define('ROOT_DIR', realpath(dirname(dirname(__DIR__))));
define('HOST', 'rabbitmq');
define('PORT', 5672);
define('USER', 'guest');
define('PASSWD', 'guest');
define('VHOST', '/');
define('AMQP_DEBUG', true);
require_once ROOT_DIR . '/library/vendor/autoload.php';
$brokerConnection = new \PhpAmqpLib\Connection\AMQPStreamConnection(HOST, PORT, USER, PASSWD, VHOST);
$queue = 'test.queue';
$exchange = 'test.exchange';
$routingKey = 'my.routing.key';
$channel = $brokerConnection->channel();
$channel->confirm_select();
$channel->set_ack_handler(function(AMQPMessage $message) {
echo "Message acked!", PHP_EOL;
var_dump($message);
});
$channel->set_nack_handler(function(AMQPMessage $message) use ($channel, $exchange, $routingKey) {
echo "Message nacked!", PHP_EOL;
var_dump($message);
// resend
$channel->basic_publish($message, $exchange, $routingKey, $mandatory = true);
});
$channel->set_return_listener(function($replyCode, $replyText, $exchange, $routingKey, AMQPMessage $message) use ($channel) {
echo "Message returned!", PHP_EOL;
var_dump($message);
// resend
$channel->basic_publish($message, $exchange, $routingKey, $mandatory = true);
});
$channel->exchange_declare($exchange, $type = 'direct', $passive = false, $durable = true, $autoDelete = false);
$channel->queue_declare($queue, $passive = false, $durable = true, $exclusive = false, $autoDelete = false);
$channel->queue_bind($queue, $exchange, $routingKey);
$message = new AMQPMessage('hello!', array('content_type' => 'text/plain', 'delivery_mode' => 2));
$channel->basic_publish($message, $exchange, $routingKey, $mandatory = true);
$channel->wait_for_pending_acks_returns();
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.