Skip to content

Instantly share code, notes, and snippets.

Last active December 21, 2015 06:29
Show Gist options
  • Save frankmayer/6264596 to your computer and use it in GitHub Desktop.
Save frankmayer/6264596 to your computer and use it in GitHub Desktop.
FIles for testing strange performance difference between Linux and Windows
namespace PhpAmqpLib\Connection;
use PhpAmqpLib\Channel\AbstractChannel;
use PhpAmqpLib\Channel\AMQPChannel;
use PhpAmqpLib\Exception\AMQPProtocolConnectionException;
use PhpAmqpLib\Exception\AMQPRuntimeException;
use PhpAmqpLib\Exception\AMQPTimeoutException;
use PhpAmqpLib\Helper\MiscHelper;
use PhpAmqpLib\Wire\AMQPWriter;
use PhpAmqpLib\Wire\AMQPReader;
use PhpAmqpLib\Wire\IO\AbstractIO;
class AbstractConnection extends AbstractChannel
public static $LIBRARY_PROPERTIES = array(
"library" => array('S', "PHP AMQP Lib"),
"library_version" => array('S', "2.0")
* contructor parameters for clone
* @var array
protected $construct_params;
* close the connection in destructur
* @var bool
protected $close_on_destruct = true ;
* @var null|\PhpAmqpLib\Wire\IO\AbstractIO
protected $io = null;
public function __construct($user, $password,
AbstractIO $io)
// save the params for the use of __clone
$this->construct_params = func_get_args();
if ($user && $password) {
$login_response = new AMQPWriter();
$login_response->write_table(array("LOGIN" => array('S',$user),
"PASSWORD" => array('S',$password)));
$login_response = substr($login_response->getvalue(),4); //Skip the length
} else {
$login_response = null;
while (true) {
$this->channels = array();
// The connection object itself is treated as channel 0
parent::__construct($this, 0);
$this->channel_max = 65535;
$this->frame_max = 131072;
$this->io = $io;
$this->input = new AMQPReader(null, $this->io);
$this->x_start_ok($d, $login_method, $login_response, $locale);
$this->wait_tune_ok = true;
while ($this->wait_tune_ok) {
$host = $this->x_open($vhost,"", $insist);
if (!$host) {
return; // we weren't redirected
// we were redirected, close the socket, loop and try again
* cloning will use the old properties to make a new connection to the same server
public function __clone()
call_user_func_array(array($this, '__construct'), $this->construct_params);
public function __destruct()
if ($this->close_on_destruct) {
if (isset($this->input) && $this->input) {
// close() always tries to connect to the server to shutdown
// the connection. If the server has gone away, it will
// throw an error in the connection class, so catch it
// and shutdown quietly
try {
} catch (\Exception $e) { }
public function select($sec, $usec = 0)
return $this->getIO()->select($sec, $usec);
* allows to not close the connection
* it`s useful after the fork when you don`t want to close parent process connection
* @param bool $close
public function set_close_on_destruct($close = true)
$this->close_on_destruct = (bool) $close;
protected function close_socket()
if ($this->debug) {
MiscHelper::debug_msg("closing socket");
protected function write($data)
if ($this->debug) {
MiscHelper::debug_msg("< [hex]:\n" . MiscHelper::hexdump($data, $htmloutput = false, $uppercase = true, $return = true));
protected function do_close()
if (isset($this->input) && $this->input) {
$this->input = null;
public function get_free_channel_id()
for ($i=1; $i <= $this->channel_max; $i++) {
if (!isset($this->channels[$i])) {
return $i;
throw new AMQPRuntimeException("No free channel ids");
public function send_content($channel, $class_id, $weight, $body_size,
$packed_properties, $body)
$pkt = new AMQPWriter();
$pkt = $pkt->getvalue();
while ($body) {
$payload = substr($body,0, $this->frame_max-8);
$body = substr($body,$this->frame_max-8);
$pkt = new AMQPWriter();
$pkt = $pkt->getvalue();
protected function send_channel_method_frame($channel, $method_sig, $args="")
if ($args instanceof AMQPWriter) {
$args = $args->getvalue();
$pkt = new AMQPWriter();
$pkt->write_long(strlen($args)+4); // 4 = length of class_id and method_id
// in payload
$pkt->write_short($method_sig[0]); // class_id
$pkt->write_short($method_sig[1]); // method_id
$pkt = $pkt->getvalue();
if ($this->debug) {
MiscHelper::debug_msg("< " . MiscHelper::methodSig($method_sig) . ": " .
* Wait for a frame from the server
protected function wait_frame($timeout = 0)
// frankmayer debugging addition: Start measuring time
if (AMQP_DEBUG_FM === true ) {
$microtimeStart = microtime(true);
// frankmayer debugging addition
$currentTimeout = $this->input->getTimeout();
try {
$frame_type = $this->input->read_octet();
$channel = $this->input->read_short();
$size = $this->input->read_long();
$payload = $this->input->read($size);
$ch = $this->input->read_octet();
} catch(AMQPTimeoutException $e) {
throw $e;
if ($ch != 0xCE) {
throw new AMQPRuntimeException(sprintf("Framing error, unexpected byte: %x", $ch));
// frankmayer debugging addition: End measuring time & output result
if (AMQP_DEBUG_FM === true ) {
$microtimeEnd = microtime(true);
$microtimeDiff = $microtimeEnd -$microtimeStart;
echo " [.] AbstractConnection::wait_frame() Got response in: ",sprintf("%.4f", ($microtimeDiff))." seconds","\n";
// end frankmayer debugging addition
return array($frame_type, $channel, $payload);
* Wait for a frame from the server destined for
* a particular channel.
protected function wait_channel($channel_id, $timeout = 0)
while (true) {
list($frame_type, $frame_channel, $payload) = $this->wait_frame($timeout);
if ($frame_channel == $channel_id) {
return array($frame_type, $payload);
// Not the channel we were looking for. Queue this frame
//for later, when the other channel is looking for frames.
array($frame_type, $payload));
// If we just queued up a method for channel 0 (the Connection
// itself) it's probably a close method in reaction to some
// error, so deal with it right away.
if (($frame_type == 1) && ($frame_channel == 0)) {
* Fetch a Channel object identified by the numeric channel_id, or
* create that object if it doesn't already exist.
public function channel($channel_id = null)
if (isset($this->channels[$channel_id])) {
return $this->channels[$channel_id];
} else {
$channel_id = $channel_id ? $channel_id : $this->get_free_channel_id();
$ch = new AMQPChannel($this->connection, $channel_id);
$this->channels[$channel_id] = $ch;
return $ch;
* request a connection close
public function close($reply_code=0, $reply_text="", $method_sig=array(0, 0))
list($class_id, $method_id, $args) = $this->protocolWriter->connectionClose(
$this->send_method_frame(array($class_id, $method_id), $args);
return $this->wait(array(
public static function dump_table($table)
$tokens = array();
foreach ($table as $name => $value) {
switch ($value[0]) {
case 'D':
$val = $value[1]->n . 'E' . $value[1]->e;
case 'F':
$val = '(' . self::dump_table($value[1]) . ')';
case 'T':
$val = date('Y-m-d H:i:s', $value[1]);
$val = $value[1];
$tokens[] = $name . '=' . $val;
return implode(', ', $tokens);
protected function connection_close($args)
$reply_code = $args->read_short();
$reply_text = $args->read_shortstr();
$class_id = $args->read_short();
$method_id = $args->read_short();
throw new AMQPProtocolConnectionException($reply_code, $reply_text, array($class_id, $method_id));
* confirm a connection close
protected function x_close_ok()
$this->send_method_frame(array(10, 61));
* confirm a connection close
protected function connection_close_ok($args)
protected function x_open($virtual_host, $capabilities="", $insist=false)
$args = new AMQPWriter();
$this->send_method_frame(array(10, 40), $args);
$wait = array(
if ($this->protocolVersion == '0.8') {
$wait[] = $this->waitHelper->get_wait('connection.redirect');
return $this->wait($wait);
* signal that the connection is ready
protected function connection_open_ok($args)
$this->known_hosts = $args->read_shortstr();
if ($this->debug) {
MiscHelper::debug_msg("Open OK! known_hosts: " . $this->known_hosts);
return null;
* asks the client to use a different server
protected function connection_redirect($args)
$host = $args->read_shortstr();
$this->known_hosts = $args->read_shortstr();
if ($this->debug) {
MiscHelper::debug_msg("Redirected to [". $host . "], known_hosts [" . $this->known_hosts . "]" );
return $host;
* security mechanism challenge
protected function connection_secure($args)
$challenge = $args->read_longstr();
* security mechanism response
protected function x_secure_ok($response)
$args = new AMQPWriter();
$this->send_method_frame(array(10, 21), $args);
* start connection negotiation
protected function connection_start($args)
$this->version_major = $args->read_octet();
$this->version_minor = $args->read_octet();
$this->server_properties = $args->read_table();
$this->mechanisms = explode(" ", $args->read_longstr());
$this->locales = explode(" ", $args->read_longstr());
if ($this->debug) {
MiscHelper::debug_msg(sprintf("Start from server, version: %d.%d, properties: %s, mechanisms: %s, locales: %s",
implode(', ', $this->mechanisms),
implode(', ', $this->locales)));
protected function x_start_ok($client_properties, $mechanism, $response, $locale)
$args = new AMQPWriter();
$this->send_method_frame(array(10, 11), $args);
* propose connection tuning parameters
protected function connection_tune($args)
$v = $args->read_short();
if ($v) {
$this->channel_max = $v;
$v = $args->read_long();
if ($v) {
$this->frame_max = $v;
$this->heartbeat = $args->read_short();
$this->x_tune_ok($this->channel_max, $this->frame_max, 0);
* negotiate connection tuning parameters
protected function x_tune_ok($channel_max, $frame_max, $heartbeat)
$args = new AMQPWriter();
$this->send_method_frame(array(10, 31), $args);
$this->wait_tune_ok = false;
* get socket from current connection
public function getSocket()
return $this->sock;
* @return \PhpAmqpLib\Wire\IO\AbstractIO
protected function getIO() {
return $this->io;
define('AMQP_DEBUG_FM', true);
require_once __DIR__ . '/../vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPConnection;
use PhpAmqpLib\Message\AMQPMessage;
$connection = new AMQPConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
$channel->queue_declare('rpc_queue', false, false, false, false);
function fib($n) {
// if ($n == 0)
// return 0;
// if ($n == 1)
// return 1;
// return fib($n-1) + fib($n-2);
return $n;
echo " [x] Awaiting RPC requests\n";
$callback = function($req) {
$n = intval($req->body);
echo " [.] fib(", $n, ")\n";
$msg = new AMQPMessage(
(string) fib($n),
array('correlation_id' => $req->get('correlation_id'))
$msg, '', $req->get('reply_to'));
$channel->basic_qos(null, 1, null);
$channel->basic_consume('rpc_queue', '', false, false, false, false, $callback);
while(count($channel->callbacks)) {
require_once __DIR__ . '/../vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPConnection;
use PhpAmqpLib\Message\AMQPMessage;
class FibonacciRpcClient
private $connection;
private $channel;
private $callback_queue;
private $response;
private $corr_id;
public function __construct()
$this->connection = new AMQPConnection(
'localhost', 5672, 'guest', 'guest');
$this->channel = $this->connection->channel();
list($this->callback_queue, ,) = $this->channel->queue_declare(
array($this, 'on_response')
public function on_response($rep)
if ($rep->get('correlation_id') == $this->corr_id) {
$this->response = $rep->body;
public function call($n)
$this->response = null;
$this->corr_id = uniqid();
$msg = new AMQPMessage(
(string) $n,
'correlation_id' => $this->corr_id,
'reply_to' => $this->callback_queue
$this->channel->basic_publish($msg, '', 'rpc_queue');
while (!$this->response) {
return intval($this->response);
define('AMQP_DEBUG_FM', true);
$fibonacci_rpc = new FibonacciRpcClient();
for ($i = 0; $i <= 20000; $i++) {
$response = $fibonacci_rpc->call(30);
echo " [.] Got ", $response, "\n";
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment