Skip to content

Instantly share code, notes, and snippets.

@mikeyp
Created August 8, 2017 20:04
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save mikeyp/3623c5186dba5152f109ab45f5aad4d0 to your computer and use it in GitHub Desktop.
Save mikeyp/3623c5186dba5152f109ab45f5aad4d0 to your computer and use it in GitHub Desktop.
<?php
class WaitingQueueSignalHandler {
/**
* Flag indicating that a restart is necessary.
*
* @var bool
*/
protected $rebootRequired = FALSE;
/**
* Flag indicating that a job is currently being processed.
*
* @var bool
*/
protected $processing = FALSE;
/**
* An array of all the signals for which handlers have been registered.
*
* @var array
*/
protected $signals = array();
/**
* State variable that tracks if an interrupt has already been sent once on
* this job cycle.
*
* By keeping track of this, we can allow external actors to forcibly shut
* interrupt a job while in process without having to resort to SIGKILL.
*
* We do not reuse $rebootRequired in order to clearly separate the expected
* SIGALRM case from an unexpected external signal.
*
* @var bool
*/
protected $interrupted = FALSE;
/**
* Creates a new signal handler helper, attaching to it the set of desired
* signals.
*
* @param array $rebootSignals
* An array of signals (e.g., SIGINT, SIGTERM, SIGHUP). The only disallowed
* signal is SIGALRM, as waiting_queue uses it internally.
*/
public function __construct(array $rebootSignals = array()) {
$this->installHandlers($rebootSignals);
}
/**
* Informs the signal handler that job processing is commencing.
*
* @param mixed $item
* The job payload that is about to be processed. There are no restrictions
* on payload format, though they tend to be serialize()d strings.
*/
public function startJob($item) {
$this->processing = TRUE;
}
/**
* Informs the signal handler that job processing has finished.
*
* Calling this method does not imply success or failure of the job, only that
* processing has ceased.
*/
public function finishJob() {
$this->processing = FALSE;
// Reset interrupt state, even though it shouldn't matter.
$this->interrupted = FALSE;
if (TRUE === $this->rebootRequired) {
$this->gracefulExit();
}
}
/**
* Informs the signal handler that a reboot is needed at the next possible
* opportunity.
*/
public function rebootRequired() {
$this->rebootRequired = TRUE;
}
/**
* Processes an incoming signal.
*
* This is the main signal handler method, used to process all signals except
* SIGALRM.
*
* @param integer $signal
* The signal sent, as an integer.
*/
public function signalHandler($signal) {
if (TRUE === $this->processing) {
$this->rebootRequired();
if (SIGINT === $signal) {
if (FALSE === $this->interrupted) {
print("A job is currently being processed; the worker will be shut down when the job is finished. Press Ctrl-C again to quit immediately without waiting for the job to complete.\n");
$this->interrupted = TRUE;
}
else {
print("Shutting down immediately...\n");
$this->gracefulExit();
}
}
}
else {
$this->gracefulExit();
}
}
/**
* Processes an incoming SIGALRM, used for intentional timed self-termination.
*
* @param integer $signal
* The integer value of SIGALRM (14).
*/
public function alarmHandler($signal) {
if (TRUE === $this->processing) {
print("Alarm received, job is processing, exit soon\n");
$this->rebootRequired();
}
else {
print("Alarm received, exiting now\n");
$this->gracefulExit();
}
}
/**
* Terminate the current process as safely as possible.
*/
protected function gracefulExit() {
// Simply exit. Child classes should put cleanup logic here, if needed.
exit();
}
/**
* Registers signal handlers with the current PHP process.
*
* @param array $rebootSignals
*/
protected function installHandlers($rebootSignals) {
$this->signals = $rebootSignals;
declare(ticks = 1);
foreach ($rebootSignals as $signal) {
pcntl_signal($signal, array($this, 'signalHandler'));
}
pcntl_signal(SIGALRM, array($this, 'alarmHandler'));
}
}
$sh = new WaitingQueueSignalHandler([SIGTERM, SIGINT]);
pcntl_alarm(300);
// loop forever performing tasks
$seconds = 0;
while (1) {
$sh->startJob($seconds);
// do something interesting here
usleep(4000000);
$seconds += 4;
print("This program has been running for $seconds seconds\n");
$sh->finishJob();
}
<?php
/**
* Implements hook_drush_help().
*/
function waiting_queue_drush_help($section) {
switch ($section) {
case 'drush:process-waiting-queue':
return dt('Run the named, waiting Drupal queue. Allows for processing queues that block indefinitely waiting for new jobs without holding up other queues.');
}
}
/**
* Implements hook_drush_command().
*/
function waiting_queue_drush_command() {
$items['process-waiting-queue'] = array(
'callback' => 'waiting_queue_process_queue',
'description' => 'Run the named, waiting Drupal queue.',
'arguments' => array(
'queue_to_process' => 'The name of the queue to run.',
),
);
$items['test-signals'] = array(
'callback' => 'waiting_queue_test_signals',
'description' => '',
'bootstrap' => DRUSH_BOOTSTRAP_NONE,
);
return $items;
}
/**
* Gateway to either a signal or non-signal based waiting_queue implementation.
*
* Signal handling gives us a much more robust and precise queue, so we use it
* if it's at all available.
*
* @param string $queue_name
*/
function waiting_queue_process_queue($queue_name) {
set_time_limit(0);
if (function_exists('pcntl_signal')) {
$queue_runner = \Drupal::service('waiting_queue.signal_queue_runner_service');
}
else {
$queue_runner = \Drupal::service('waiting_queue.queue_runner_service');
}
$queue_runner->processQueue($queue_name);
}
function waiting_queue_test_signals() {
// var_dump(pcntl_signal_get_handler(SIGINT));
$sh = new WaitingQueueSignalHandler([SIGTERM, SIGINT]);
pcntl_alarm(300);
// loop forever performing tasks
$seconds = 0;
while (1) {
$sh->startJob($seconds);
// do something interesting here
usleep(4000000);
$seconds += 4;
print("This program has been running for $seconds seconds\n");
$sh->finishJob();
}
}
class WaitingQueueSignalHandler {
/**
* Flag indicating that a restart is necessary.
*
* @var bool
*/
protected $rebootRequired = FALSE;
/**
* Flag indicating that a job is currently being processed.
*
* @var bool
*/
protected $processing = FALSE;
/**
* An array of all the signals for which handlers have been registered.
*
* @var array
*/
protected $signals = array();
/**
* State variable that tracks if an interrupt has already been sent once on
* this job cycle.
*
* By keeping track of this, we can allow external actors to forcibly shut
* interrupt a job while in process without having to resort to SIGKILL.
*
* We do not reuse $rebootRequired in order to clearly separate the expected
* SIGALRM case from an unexpected external signal.
*
* @var bool
*/
protected $interrupted = FALSE;
/**
* Creates a new signal handler helper, attaching to it the set of desired
* signals.
*
* @param array $rebootSignals
* An array of signals (e.g., SIGINT, SIGTERM, SIGHUP). The only disallowed
* signal is SIGALRM, as waiting_queue uses it internally.
*/
public function __construct(array $rebootSignals = array()) {
$this->installHandlers($rebootSignals);
}
/**
* Informs the signal handler that job processing is commencing.
*
* @param mixed $item
* The job payload that is about to be processed. There are no restrictions
* on payload format, though they tend to be serialize()d strings.
*/
public function startJob($item) {
$this->processing = TRUE;
}
/**
* Informs the signal handler that job processing has finished.
*
* Calling this method does not imply success or failure of the job, only that
* processing has ceased.
*/
public function finishJob() {
$this->processing = FALSE;
// Reset interrupt state, even though it shouldn't matter.
$this->interrupted = FALSE;
if (TRUE === $this->rebootRequired) {
$this->gracefulExit();
}
}
/**
* Informs the signal handler that a reboot is needed at the next possible
* opportunity.
*/
public function rebootRequired() {
$this->rebootRequired = TRUE;
}
/**
* Processes an incoming signal.
*
* This is the main signal handler method, used to process all signals except
* SIGALRM.
*
* @param integer $signal
* The signal sent, as an integer.
*/
public function signalHandler($signal) {
print("signal received!");
if (TRUE === $this->processing) {
$this->rebootRequired();
if (SIGINT === $signal) {
if (FALSE === $this->interrupted) {
print("A job is currently being processed; the worker will be shut down when the job is finished. Press Ctrl-C again to quit immediately without waiting for the job to complete.\n");
$this->interrupted = TRUE;
}
else {
print("Shutting down immediately...\n");
$this->gracefulExit();
}
}
}
else {
$this->gracefulExit();
}
}
/**
* Processes an incoming SIGALRM, used for intentional timed self-termination.
*
* @param integer $signal
* The integer value of SIGALRM (14).
*/
public function alarmHandler($signal) {
if (TRUE === $this->processing) {
print("Alarm received, job is processing, exit soon\n");
$this->rebootRequired();
}
else {
print("Alarm received, exiting now\n");
$this->gracefulExit();
}
}
/**
* Terminate the current process as safely as possible.
*/
protected function gracefulExit() {
// Simply exit. Child classes should put cleanup logic here, if needed.
exit();
}
/**
* Registers signal handlers with the current PHP process.
*
* @param array $rebootSignals
*/
protected function installHandlers($rebootSignals) {
$this->signals = $rebootSignals;
declare(ticks = 1);
foreach ($rebootSignals as $signal) {
pcntl_signal($signal, array($this, 'signalHandler'));
}
pcntl_signal(SIGALRM, array($this, 'alarmHandler'));
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment