Skip to content

Instantly share code, notes, and snippets.

@AllenJB
Last active July 6, 2018 11:55
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save AllenJB/03b9a81255eddb47dab4b2d39c6de9c7 to your computer and use it in GitHub Desktop.
Save AllenJB/03b9a81255eddb47dab4b2d39c6de9c7 to your computer and use it in GitHub Desktop.
Bunny Async Worker w/ signal handler
<?php
use Bunny\Channel;
use Bunny\Async\Client;
use Bunny\Message;
use Bunny\Protocol\MethodBasicConsumeOkFrame;
use React\EventLoop\Factory;
require '../../vendor/autoload.php';
class WorkerProc
{
public $consumerTag = null;
/**
* @var Channel|null
*/
public $channel = null;
public function run() : void
{
$loop = Factory::create();
$clientConfig = [
"host" => "rabbitmq.example.com",
"port" => 5672,
"vhost" => "/",
"user" => "app",
"password" => "app",
];
$proc = $this;
$client = new Client($loop, $clientConfig);
$client->connect()->then(function (Client $client) {
return $client->channel();
}, function($reason) {
$reasonMsg = "";
if (is_string($reason)) {
$reasonMsg = $reason;
} else if ($reason instanceof Throwable) {
$reasonMsg = $reason->getMessage();
}
print "Rejected: {$reasonMsg}\n";
})->then(function (Channel $channel) {
return $channel->qos(0, 1)->then(function () use ($channel) {
return $channel;
});
})->then(function (Channel $channel) {
return $channel->queueDeclare('test', false, true, false, false)->then(function () use ($channel) {
return $channel;
});
})->then(function (Channel $channel) use ($proc) {
$proc->channel = $channel;
echo ' [*] Waiting for messages. To exit press CTRL+C', "\n";
$channel->consume(
function (Message $message, Channel $channel, Client $client) use ($proc) {
echo " [x] Received ", $message->content, "\n";
for ($i = 0; $i < 3; $i++) {
print "WU {$i}\n";
password_hash(random_bytes(255), PASSWORD_BCRYPT, ["cost" => 15]);
}
echo " [x] Done ", $message->content, "\n";
$channel->ack($message)->then(function() use ($message) {
print "ACK :: {$message->content}\n";
}, function($reason) {
$reasonMsg = "";
if (is_string($reason)) {
$reasonMsg = $reason;
} else if ($reason instanceof Throwable) {
$reasonMsg = $reason->getMessage();
}
print "ACK FAILED! - {$reasonMsg}\n";
})->done();
},
'test'
)->then(function (MethodBasicConsumeOkFrame $response) use ($proc) {
$proc->consumerTag = $response->consumerTag;
})->done();
})->done();
$loop->addSignal(SIGINT, function (int $signal) use ($proc) {
print "Consumer cancelled\n";
$proc->channel->cancel($proc->consumerTag)->done(function() {
exit();
});
});
$loop->addSignal(SIGTERM, function (int $signal) use ($proc) {
print "Consumer cancelled\n";
$proc->channel->cancel($proc->consumerTag)->done(function() {
exit();
});
});
$loop->run();
}
}
$proc = new WorkerProc();
$proc->run();
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment