anotherjesse (owner)

Revisions

gist: 141149 Download_button fork
public
Description:
using rabbitmq from php via amqp and stomp
Public Clone URL: git://gist.github.com/141149.git
Embed All Files: show embed
consumer.php #
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
#!/usr/bin/php
<?php
 
// AMQP based RabbitMQ consumer
// Uses http://code.google.com/p/php-amqplib/
 
require_once('amqp.inc');
 
$HOST = 'localhost';
$PORT = 5672;
$USER = 'guest';
$PASS = 'guest';
$QUEUE = 'po_box';
 
try {
  $conn = new AMQPConnection($BROKER_HOST, $BROKER_PORT, $USER, $PASSWORD);
  $chan = $conn->channel();
  $chan->queue_declare($QUEUE);
 
  $msg = $chan->basic_get($QUEUE);
 
  if ($msg) {
    echo "Message: " . $msg->body . "\n";
    $chan->basic_ack($msg->delivery_info['delivery_tag']);
  }
 
  $chan->close();
  $conn->close();
} catch (Exception $e) {
  echo 'Caught exception: ', $e->getMessage();
  echo "\nTrace:\n" . $e->getTraceAsString();
}
?>
producer.php #
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
#!/usr/bin/php
<?php
 
// STOMP based RabbitMQ producer
 
require_once('StompSender.php');
 
$HOST = 'localhost';
$USER = 'guest';
$PASS = 'guest';
$VHOST = '/';
$EXCHANGE = 'direct';
$QUEUE = 'po_box';
 
if (count($argv) > 1) {
  $msg = $argv;
  array_shift($msg);
  $msg = implode(' ', $msg);
 
  $ss = new StompSender($HOST, $USER, $PASS, $VHOST, $EXCHANGE);
  $ss->send($msg, $QUEUE);
 
  echo "SENT: " . $msg . "\n";
}
else {
  echo "Usage: " . $argv[0] . " message to send\n";
}
?>
StompSender.php #
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
<?php
 
/**
* Handles SENDs to STOMP
* Only tested with the rabbitmq stomp gateway and rabbitmq server 1.4.0
* Only supports sending message to stomp, no support for other commands
*
* Original source: http://code.google.com/p/simplisticstompclient/
*
* Changes: Added ability to specify host
*
*/
class StompSender{
private $host;
private $port;
 
private $login;
private $passcode;
private $virtual_host;
private $realm;
 
private $timeout;
 
/**
* Constructor
*
* @param string $host (e.g. 'localhost')
* @param string $login (e.g. 'guest')
* @param string $passcode (e.g. 'guest')
* @param string $virtual_host (e.g. '/')
* @param string $realm (e.g. '/data')
*/
public function __construct($host, $login, $passcode, $virtual_host, $realm){
$this->host = 'tcp://' . $host;
$this->port = 61613;
 
$this->login = (string)$login;
$this->passcode = (string)$passcode;
$this->virtual_host = (string)$virtual_host;
$this->realm = (string)$realm;
 
$this->timeout = 5;
}
 
/**
* Sets timeout for socket connection, default 5 sec
*
* @param int $sec Value in seconds
*/
public function setTimeout($sec){
$this->timeout = (int)$sec;
}
 
/**
* Sets host where stomp is listening, default 'tcp://localhost'
*
* @param string $host
*/
public function setHost($host){
$this->host = (string)$host;
}
 
/**
* Sets port where stomp is listening, default 61613
*
* @param int $port
*/
public function setPort($port){
$this->port = (int)$port;
}
 
/**
* Sends a message to a queue
*
* @param string $message Message to send
* @param string $queue Destination
* @throws StompException on socket fails and unexpected responses
* @return boolean true on success (throws exception on fail)
*/
public function send($message, $queue){
$m = (string)$message;
$q = (string)$queue;
 
$msg_connect = "CONNECT\nlogin:$this->login\npasscode:$this->passcode\nvirtual-host:$this->virtual_host\nrealm:$this->realm\n\n\x00";
$msg_send = "SEND\ndestination:$q\nreceipt:ok\n\n$m\x00";
$msg_disconnect = "DISCONNECT\n\n\x00";
if(!($r = fsockopen($this->host,$this->port))) throw new StompException('fsockopen failed');
stream_set_timeout($r, $this->timeout);
if(!fwrite($r, $msg_connect.$msg_send.$msg_disconnect)){
$md = stream_get_meta_data($r);
if($md['timed_out']) throw new StompException('connection timed out');
throw new StompException('fwrite failed');
}
if(!('CONNECTED' == fread($r,9))){
$md = stream_get_meta_data($r);
if($md['timed_out']) throw new StompException('connection timed out');
throw new StompException('did not get response CONNECTED');
}
fread($r,44);
$md = stream_get_meta_data($r);
if($md['timed_out']) throw new StompException('connection timed out');
if(!("RECEIPT\nreceipt-id:ok" == fread($r,21))){
$md = stream_get_meta_data($r);
if($md['timed_out']) throw new StompException('connection timed out');
throw new StompException('did not get response RECEIPT');
}
fclose($r);
 
return true;
}
}
 
/**
* Exception thrown by StompSender on socket fails and unexpected responses
*
*/
class StompException extends Exception {}
?>