Created
July 5, 2009 22:40
-
-
Save anotherjesse/141149 to your computer and use it in GitHub Desktop.
using rabbitmq from php via amqp and stomp
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/php | |
<?php | |
// AMQP based RabbitMQ consumer | |
// Uses http://code.google.com/p/php-amqplib/ | |
require_once('amqp.inc'); | |
$HOST = 'localhost'; | |
$PORT = 5672; | |
$USER = 'guest'; | |
$PASS = 'guest'; | |
$QUEUE = 'po_box'; | |
try { | |
$conn = new AMQPConnection($BROKER_HOST, $BROKER_PORT, $USER, $PASSWORD); | |
$chan = $conn->channel(); | |
$chan->queue_declare($QUEUE); | |
$msg = $chan->basic_get($QUEUE); | |
if ($msg) { | |
echo "Message: " . $msg->body . "\n"; | |
$chan->basic_ack($msg->delivery_info['delivery_tag']); | |
} | |
$chan->close(); | |
$conn->close(); | |
} catch (Exception $e) { | |
echo 'Caught exception: ', $e->getMessage(); | |
echo "\nTrace:\n" . $e->getTraceAsString(); | |
} | |
?> |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/php | |
<?php | |
// STOMP based RabbitMQ producer | |
require_once('StompSender.php'); | |
$HOST = 'localhost'; | |
$USER = 'guest'; | |
$PASS = 'guest'; | |
$VHOST = '/'; | |
$EXCHANGE = 'direct'; | |
$QUEUE = 'po_box'; | |
if (count($argv) > 1) { | |
$msg = $argv; | |
array_shift($msg); | |
$msg = implode(' ', $msg); | |
$ss = new StompSender($HOST, $USER, $PASS, $VHOST, $EXCHANGE); | |
$ss->send($msg, $QUEUE); | |
echo "SENT: " . $msg . "\n"; | |
} | |
else { | |
echo "Usage: " . $argv[0] . " message to send\n"; | |
} | |
?> |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
<?php | |
/** | |
* Handles SENDs to STOMP | |
* Only tested with the rabbitmq stomp gateway and rabbitmq server 1.4.0 | |
* Only supports sending message to stomp, no support for other commands | |
* | |
* Original source: http://code.google.com/p/simplisticstompclient/ | |
* | |
* Changes: Added ability to specify host | |
* | |
*/ | |
class StompSender{ | |
private $host; | |
private $port; | |
private $login; | |
private $passcode; | |
private $virtual_host; | |
private $realm; | |
private $timeout; | |
/** | |
* Constructor | |
* | |
* @param string $host (e.g. 'localhost') | |
* @param string $login (e.g. 'guest') | |
* @param string $passcode (e.g. 'guest') | |
* @param string $virtual_host (e.g. '/') | |
* @param string $realm (e.g. '/data') | |
*/ | |
public function __construct($host, $login, $passcode, $virtual_host, $realm){ | |
$this->host = 'tcp://' . $host; | |
$this->port = 61613; | |
$this->login = (string)$login; | |
$this->passcode = (string)$passcode; | |
$this->virtual_host = (string)$virtual_host; | |
$this->realm = (string)$realm; | |
$this->timeout = 5; | |
} | |
/** | |
* Sets timeout for socket connection, default 5 sec | |
* | |
* @param int $sec Value in seconds | |
*/ | |
public function setTimeout($sec){ | |
$this->timeout = (int)$sec; | |
} | |
/** | |
* Sets host where stomp is listening, default 'tcp://localhost' | |
* | |
* @param string $host | |
*/ | |
public function setHost($host){ | |
$this->host = (string)$host; | |
} | |
/** | |
* Sets port where stomp is listening, default 61613 | |
* | |
* @param int $port | |
*/ | |
public function setPort($port){ | |
$this->port = (int)$port; | |
} | |
/** | |
* Sends a message to a queue | |
* | |
* @param string $message Message to send | |
* @param string $queue Destination | |
* @throws StompException on socket fails and unexpected responses | |
* @return boolean true on success (throws exception on fail) | |
*/ | |
public function send($message, $queue){ | |
$m = (string)$message; | |
$q = (string)$queue; | |
$msg_connect = "CONNECT\nlogin:$this->login\npasscode:$this->passcode\nvirtual-host:$this->virtual_host\nrealm:$this->realm\n\n\x00"; | |
$msg_send = "SEND\ndestination:$q\nreceipt:ok\n\n$m\x00"; | |
$msg_disconnect = "DISCONNECT\n\n\x00"; | |
if(!($r = fsockopen($this->host,$this->port))) throw new StompException('fsockopen failed'); | |
stream_set_timeout($r, $this->timeout); | |
if(!fwrite($r, $msg_connect.$msg_send.$msg_disconnect)){ | |
$md = stream_get_meta_data($r); | |
if($md['timed_out']) throw new StompException('connection timed out'); | |
throw new StompException('fwrite failed'); | |
} | |
if(!('CONNECTED' == fread($r,9))){ | |
$md = stream_get_meta_data($r); | |
if($md['timed_out']) throw new StompException('connection timed out'); | |
throw new StompException('did not get response CONNECTED'); | |
} | |
fread($r,44); | |
$md = stream_get_meta_data($r); | |
if($md['timed_out']) throw new StompException('connection timed out'); | |
if(!("RECEIPT\nreceipt-id:ok" == fread($r,21))){ | |
$md = stream_get_meta_data($r); | |
if($md['timed_out']) throw new StompException('connection timed out'); | |
throw new StompException('did not get response RECEIPT'); | |
} | |
fclose($r); | |
return true; | |
} | |
} | |
/** | |
* Exception thrown by StompSender on socket fails and unexpected responses | |
* | |
*/ | |
class StompException extends Exception {} | |
?> |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment