Create a gist now

Instantly share code, notes, and snippets.

Beanstalkd queue worker
<?php
declare(strict_types=1);
namespace My\Queue;
use Pheanstalk\Pheanstalk;
use Exception;
class BeanstalkdWorker
{
protected $queueClient;
protected $queueNames;
protected $handler;
protected $options = [];
private $startTime;
private function __construct(
Pheanstalk $queueClient,
array $queueNames,
callable $handler,
array $options
) {
$this->queueClient = $queueClient;
$this->queueNames = $queueNames;
$this->handler = $handler;
$this->options = array_merge($this->getDefaultOptions(), $options);
}
public static function create(
Pheanstalk $queueClient,
$queueNames,
callable $handler,
array $options
) : self {
return new self(
$queueClient,
$queueNames,
$handler,
$logger,
$options
);
}
private function getDefaultOptions() : array
{
return [
'sleep' => 5,
'timeout' => 60,
'memory' => 128,
'on_error' => function ($job, Exception $error) {
$this->queueClient->bury($error->getJob());
}
];
}
public function run()
{
$this->init();
while (true) {
$this->runNextJob();
if ($this->shouldRestart()) {
$this->stop();
}
}
}
final protected function init()
{
$this->startTime = time();
$this->watchQueues();
}
protected function watchQueues()
{
foreach ($this->queueNames as $queueName) {
$this->queueClient->watch($queueName);
}
}
protected function runNextJob()
{
$job = $this->queueClient->reserve(0);
if (!$job) {
$this->sleep();
return;
}
try {
$this->processJob($job);
} catch (Exception $ex) {
$this->handleJobException($job, $ex);
}
}
protected function processJob($job)
{
$handler = $this->handler;
$handler($job);
$this->queueClient->delete($job);
}
protected function handleJobException($job, $exception)
{
$errorHandler = $this->options['on_error'];
$errorHandler($job, $exception);
}
protected function shouldRestart() : bool
{
if (
$this->timeoutReached($this->options['timeout'])
|| $this->memoryExceeded($this->options['memory'])
) {
return true;
}
return false;
}
protected function timeoutReached(int $timeout) : bool
{
return (time() - $this->startTime >= $timeout);
}
protected function memoryExceeded($memoryLimit) : bool
{
return (memory_get_usage() / 1024 / 1024) >= $memoryLimit;
}
protected function stop()
{
die;
}
protected function sleep()
{
sleep($this->options['sleep']);
}
}
<?php
declare(strict_types=1);
use My\BeanstalkdWorker;
use Pheanstalk\Pheanstalk;
use Pheanstalk\Job;
$worker = BeanstalkdWorker::create(
new Pheanstalk('127.0.0.1'),
[
'notifications',
],
function (Job $job) {
$notification = json_decode($job->getData(), true);
mail($notification['to'], $notification['subject'], $notification['message']);
},
[
'timeout' => 120,
'memory' => 256,
]
);
$worker->run();
[program:notificationsWorker]
command=/usr/bin/php /home/ubuntu/app/bin/notifications_worker.php
process_name=%(program_name)s.%(process_num)s
numprocs=5
directory=/tmp
stdout_logfile=/var/log/supervisor/%(program_name)s.%(process_num)s.stdout.log
autostart=true
autorestart=true
user=ubuntu
exitcodes=0
stopsignal=KILL
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment