Skip to content

Instantly share code, notes, and snippets.

@anotherjesse
Created July 5, 2009 22:40
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save anotherjesse/141149 to your computer and use it in GitHub Desktop.
Save anotherjesse/141149 to your computer and use it in GitHub Desktop.
using rabbitmq from php via amqp and stomp
#!/usr/bin/php
<?php
// AMQP based RabbitMQ consumer
// Uses http://code.google.com/p/php-amqplib/
require_once('amqp.inc');
$HOST = 'localhost';
$PORT = 5672;
$USER = 'guest';
$PASS = 'guest';
$QUEUE = 'po_box';
try {
$conn = new AMQPConnection($BROKER_HOST, $BROKER_PORT, $USER, $PASSWORD);
$chan = $conn->channel();
$chan->queue_declare($QUEUE);
$msg = $chan->basic_get($QUEUE);
if ($msg) {
echo "Message: " . $msg->body . "\n";
$chan->basic_ack($msg->delivery_info['delivery_tag']);
}
$chan->close();
$conn->close();
} catch (Exception $e) {
echo 'Caught exception: ', $e->getMessage();
echo "\nTrace:\n" . $e->getTraceAsString();
}
?>
#!/usr/bin/php
<?php
// STOMP based RabbitMQ producer
require_once('StompSender.php');
$HOST = 'localhost';
$USER = 'guest';
$PASS = 'guest';
$VHOST = '/';
$EXCHANGE = 'direct';
$QUEUE = 'po_box';
if (count($argv) > 1) {
$msg = $argv;
array_shift($msg);
$msg = implode(' ', $msg);
$ss = new StompSender($HOST, $USER, $PASS, $VHOST, $EXCHANGE);
$ss->send($msg, $QUEUE);
echo "SENT: " . $msg . "\n";
}
else {
echo "Usage: " . $argv[0] . " message to send\n";
}
?>
<?php
/**
* Handles SENDs to STOMP
* Only tested with the rabbitmq stomp gateway and rabbitmq server 1.4.0
* Only supports sending message to stomp, no support for other commands
*
* Original source: http://code.google.com/p/simplisticstompclient/
*
* Changes: Added ability to specify host
*
*/
class StompSender{
private $host;
private $port;
private $login;
private $passcode;
private $virtual_host;
private $realm;
private $timeout;
/**
* Constructor
*
* @param string $host (e.g. 'localhost')
* @param string $login (e.g. 'guest')
* @param string $passcode (e.g. 'guest')
* @param string $virtual_host (e.g. '/')
* @param string $realm (e.g. '/data')
*/
public function __construct($host, $login, $passcode, $virtual_host, $realm){
$this->host = 'tcp://' . $host;
$this->port = 61613;
$this->login = (string)$login;
$this->passcode = (string)$passcode;
$this->virtual_host = (string)$virtual_host;
$this->realm = (string)$realm;
$this->timeout = 5;
}
/**
* Sets timeout for socket connection, default 5 sec
*
* @param int $sec Value in seconds
*/
public function setTimeout($sec){
$this->timeout = (int)$sec;
}
/**
* Sets host where stomp is listening, default 'tcp://localhost'
*
* @param string $host
*/
public function setHost($host){
$this->host = (string)$host;
}
/**
* Sets port where stomp is listening, default 61613
*
* @param int $port
*/
public function setPort($port){
$this->port = (int)$port;
}
/**
* Sends a message to a queue
*
* @param string $message Message to send
* @param string $queue Destination
* @throws StompException on socket fails and unexpected responses
* @return boolean true on success (throws exception on fail)
*/
public function send($message, $queue){
$m = (string)$message;
$q = (string)$queue;
$msg_connect = "CONNECT\nlogin:$this->login\npasscode:$this->passcode\nvirtual-host:$this->virtual_host\nrealm:$this->realm\n\n\x00";
$msg_send = "SEND\ndestination:$q\nreceipt:ok\n\n$m\x00";
$msg_disconnect = "DISCONNECT\n\n\x00";
if(!($r = fsockopen($this->host,$this->port))) throw new StompException('fsockopen failed');
stream_set_timeout($r, $this->timeout);
if(!fwrite($r, $msg_connect.$msg_send.$msg_disconnect)){
$md = stream_get_meta_data($r);
if($md['timed_out']) throw new StompException('connection timed out');
throw new StompException('fwrite failed');
}
if(!('CONNECTED' == fread($r,9))){
$md = stream_get_meta_data($r);
if($md['timed_out']) throw new StompException('connection timed out');
throw new StompException('did not get response CONNECTED');
}
fread($r,44);
$md = stream_get_meta_data($r);
if($md['timed_out']) throw new StompException('connection timed out');
if(!("RECEIPT\nreceipt-id:ok" == fread($r,21))){
$md = stream_get_meta_data($r);
if($md['timed_out']) throw new StompException('connection timed out');
throw new StompException('did not get response RECEIPT');
}
fclose($r);
return true;
}
}
/**
* Exception thrown by StompSender on socket fails and unexpected responses
*
*/
class StompException extends Exception {}
?>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment