Skip to content

Instantly share code, notes, and snippets.

@f2r
Created October 1, 2018 09:54
Show Gist options
  • Save f2r/0632b651a3b27a2c189652b753f45e49 to your computer and use it in GitHub Desktop.
Save f2r/0632b651a3b27a2c189652b753f45e49 to your computer and use it in GitHub Desktop.
<?php
require 'vendor/autoload.php';
$connection = new \PhpAmqpLib\Connection\AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
$channel->queue_declare("queue.test", false, true, false, false);
$channel->basic_publish(new \PhpAmqpLib\Message\AMQPMessage('unknow 1'), '', 'queue.test');
$channel->basic_publish(new \PhpAmqpLib\Message\AMQPMessage('ack 2'), '', 'queue.test');
$channel->basic_publish(new \PhpAmqpLib\Message\AMQPMessage('ack 3'), '', 'queue.test');
$channel->basic_publish(new \PhpAmqpLib\Message\AMQPMessage('end 4'), '', 'queue.test');
$callback = function (\PhpAmqpLib\Message\AMQPMessage $msg) {
list ($type, $id) = explode(' ', $msg->body, 2);
echo "Message $id => $type\n";
if ($msg->delivery_info['redelivered']) {
echo "*** redelivered\n";
$msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
return;
}
switch ($type) {
case 'ack':
$msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
break;
case 'nack':
$msg->delivery_info['channel']->basic_nack($msg->delivery_info['delivery_tag'], false, /* requeue */ true);
break;
case 'end':
$msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
$msg->delivery_info['channel']->basic_cancel($msg->delivery_info['consumer_tag']);
break;
default:
// no acknowledgement
return;
}
};
list ($queueName, $messageCount, $consumerCount) = $channel->queue_declare("queue.test", false, true, false, false);
echo "Queue: $queueName, message count: $messageCount, consumer count: $consumerCount\n";
$channel->basic_consume('queue.test', '', false, false, false, false, $callback);
list ($queueName, $messageCount, $consumerCount) = $channel->queue_declare("queue.test", false, true, false, false);
echo "Queue: $queueName, message count: $messageCount, consumer count: $consumerCount\n";
while (count($channel->callbacks)) {
$channel->wait();
}
$channel->close();
$connection->close();
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment