Skip to content

Instantly share code, notes, and snippets.

@Taluu
Created April 15, 2014 16:46
Show Gist options
  • Save Taluu/10746728 to your computer and use it in GitHub Desktop.
Save Taluu/10746728 to your computer and use it in GitHub Desktop.
<?php
use \AMQPQueue,
\AMQPChannel,
\AMQPExchange,
\AMQPEnvelope,
\AMQPConnection;
require __DIR__ . '/vendor/autoload.php';
if (2 > $argc) {
echo 'wrong argument count, expected at least 1 argument, ' . ($argc - 1) . ' given';
die;
}
$c = new AMQPConnection(['host' => 'localhost',
'port' => 5672,
'login' => 'guest',
'password' => 'guest']);
$c->connect();
$channel = new AMQPChannel($c);
$ex = new AMQPExchange($channel);
$ex->setName('ex_workers');
$ex->setType(AMQP_EX_TYPE_DIRECT);
$ex->declareExchange();
switch ($argv[1]) {
case 'send':
$data = 2 < $argc ? $argv[2] : 'oh hai';
$ex->publish($data, 'worker');
echo '[x] Sent "' . $data . "\"\n";
break;
case 'receive':
echo '[x] Waiting for new messages... CTRL+C to exit' . "\n";
$q = new AMQPQueue($channel);
$q->setName('queue_workers');
$q->setFlags(AMQP_DURABLE);
$q->declareQueue();
$q->bind($ex->getName(), 'worker');
$q->consume(function (AMQPEnvelope $msg, AMQPQueue $queue) {
echo "[x] Received \"{$msg->getBody()}\" from \"{$queue->getName()}\"\n";
$matches = [];
if (preg_match('`\d+`', $msg->getBody(), $matches)) {
$sleep = intval($matches[0]);
if (0 === $sleep % 2) {
$sleep = $sleep / 2;
}
sleep($sleep);
}
$queue->ack($msg->getDeliveryTag());
echo '[x] Done' . "\n";
});
break;
}
$c->disconnect();
<?php
use \AMQPQueue,
\AMQPChannel,
\AMQPExchange,
\AMQPConnection;
use Swarrot\Broker\Message,
Swarrot\Broker\MessageProvider\PeclPackageMessageProvider,
Swarrot\Broker\MessagePublisher\PeclPackageMessagePublisher,
Swarrot\Consumer,
Swarrot\Processor\Ack\AckProcessor,
Swarrot\Processor\ProcessorInterface;
require __DIR__ . '/vendor/autoload.php';
class Processor implements ProcessorInterface
{
public function process(Message $message, array $options)
{
printf("[x] Received message #{$message->getId()} : \"{$message->getBody()}\"\n");
if (preg_match('`\d+`', $message->getBody(), $matches)) {
$sleep = intval($matches[0]);
if (0 === $sleep % 2) {
$sleep /= 2;
}
echo "[x] Processing : sleep for 0.{$sleep} seconds\n";
usleep($sleep * 100000);
}
printf("[x] Done\n\n");
}
}
if (2 > $argc) {
echo 'wrong argument count, expected at least 1 argument, ' . ($argc - 1) . ' given';
die;
}
$c = new AMQPConnection(['host' => 'localhost',
'port' => 5672,
'login' => 'guest',
'password' => 'guest']);
$c->connect();
$channel = new AMQPChannel($c);
$ex = new AMQPExchange($channel);
$ex->setName('ex_workers');
$ex->setType(AMQP_EX_TYPE_DIRECT);
$ex->declareExchange();
switch ($argv[1]) {
case 'send':
$data = new Message(2 < $argc ? $argv[2] : 'oh hai');
//$ex->publish($data, 'worker');
$publisher = new PeclPackageMessagePublisher($ex);
$publisher->publish($data, 'worker');
echo "[x] Sent Message #{$data->getId()} : \"{$data->getBody()}\"\n";
break;
case 'receive':
echo '[x] Waiting for new messages... CTRL+C to exit' . "\n";
$q = new AMQPQueue($channel);
$q->setName('queue_workers');
$q->setFlags(AMQP_DURABLE);
$q->declareQueue();
$q->bind($ex->getName(), 'worker');
$provider = new PeclPackageMessageProvider($q);
$consumer = new Consumer($provider, new AckProcessor(new Processor, $provider));
$consumer->consume();
break;
}
$c->disconnect();
<?php
use PhpAmqpLib\Message\AMQPMessage,
PhpAmqpLib\Connection\AMQPConnection;
require __DIR__ . '/vendor/autoload.php';
if (2 > $argc) {
echo 'wrong argument count, expected at least 1 argument, ' . ($argc - 1) . ' given';
die;
}
$c = new AMQPConnection('localhost', 5672, 'guest', 'guest');
$channel = $c->channel();
$channel->queue_declare('workers', false, false, false, false);
switch ($argv[1]) {
case 'send':
$data = 2 < $argc ? $argv[2] : 'oh hai';
$msg = new AMQPMessage($data, ['delivery_mode' => 2]);
$channel->basic_publish($msg, '', 'workers');
echo '[x] Sent "' . $data . "\"\n";
break;
case 'receive':
echo '[x] Waiting for new messages... CTRL+C to exit' . "\n";
$channel->basic_qos(null, 1, null);
$channel->basic_consume('workers', '', false, false, false, false, function (AMQPMessage $msg) {
echo '[x] Received "' . $msg->body . '"' . "\n";
$matches = [];
if (preg_match('`\d+`', $msg->body, $matches)) {
$sleep = intval($matches[0]);
if (0 === $sleep % 2) {
$sleep = $sleep / 2;
}
sleep($sleep);
}
echo '[x] Done' . "\n";
$msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
});
while (count($channel->callbacks)) {
$channel->wait();
}
break;
}
$channel->close();
$c->close();
@Taluu
Copy link
Author

Taluu commented Apr 16, 2014

  • In several windows, run php pecl-only.php receive (4 looks good)
  • in one window, run for i in {1..15}; do; php pecl-only.php send "message $i"; done

Change by pecl-swarrot to test it with swarrot

the messages should be dispatched between the different windows initialized with receive.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment