Created
June 23, 2015 20:53
-
-
Save videlalvaro/45a3d16754d462bf6ccf to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
<?php | |
use PhpAmqpLib\Connection\AMQPConnection; | |
use PhpAmqpLib\Message\AMQPMessage; | |
use PhpAmqpLib\Wire\AMQPTable; | |
include(__DIR__ . '/config.php'); | |
$conn = new AMQPConnection(HOST, PORT, USER, PASS, VHOST); | |
$ch = $conn->channel(); | |
/** | |
* Declares exchange | |
* | |
* @param string $exchange | |
* @param string $type | |
* @param bool $passive | |
* @param bool $durable | |
* @param bool $auto_delete | |
* @param bool $internal | |
* @param bool $nowait | |
* @return mixed|null | |
*/ | |
$ch->exchange_declare('delayed', 'x-delayed-message', false, false, false, false, false, new AMQPTable(array( | |
"x-delayed-type" => "fanout" | |
))); | |
$ch->exchange_declare('resource_management', 'topic', false, false, false); | |
/** | |
* Declares queue, creates if needed | |
* | |
* @param string $queue | |
* @param bool $passive | |
* @param bool $durable | |
* @param bool $exclusive | |
* @param bool $auto_delete | |
* @param bool $nowait | |
* @param null $arguments | |
* @param null $ticket | |
* @return mixed|null | |
*/ | |
$ch->queue_declare('resource_management', false, false, false, false, false, new AMQPTable(array( | |
"x-dead-letter-exchange" => "delayed" | |
))); | |
$ch->queue_bind('resource_management', 'resource_management', 'my_routing_key'); | |
$ch->queue_bind('resource_management', 'delayed', '#'); | |
$hdrs = new AMQPTable(array("x-delay" => 7000)); | |
$msg = new AMQPMessage('hello', array('delivery_mode' => 2)); | |
$msg->set('application_headers', $hdrs); | |
$ch->basic_publish($msg, 'resource_management', 'my_routing_key'); | |
function process_message($msg) { | |
$hdrs = $msg->get('application_headers'); | |
$arr = $hdrs->getNativeData(); | |
var_dump($arr['x-delay']); | |
$msg->delivery_info['channel']->basic_nack($msg->delivery_info['delivery_tag']); | |
} | |
/* | |
queue: Queue from where to get the messages | |
consumer_tag: Consumer identifier | |
no_local: Don't receive messages published by this consumer. | |
no_ack: Tells the server if the consumer will acknowledge the messages. | |
exclusive: Request exclusive consumer access, meaning only this consumer can access the queue | |
nowait: | |
callback: A PHP Callback | |
*/ | |
$ch->basic_consume('resource_management', '', false, false, false, false, 'process_message'); | |
/** | |
* @param \PhpAmqpLib\Channel\AMQPChannel $ch | |
* @param \PhpAmqpLib\Connection\AbstractConnection $conn | |
*/ | |
function shutdown($ch, $conn) | |
{ | |
$ch->close(); | |
$conn->close(); | |
} | |
register_shutdown_function('shutdown', $ch, $conn); | |
// Loop as long as the channel has callbacks registered | |
while (count($ch->callbacks)) { | |
$ch->wait(); | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment