Created
August 8, 2017 20:04
-
-
Save mikeyp/3623c5186dba5152f109ab45f5aad4d0 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
<?php | |
class WaitingQueueSignalHandler { | |
/** | |
* Flag indicating that a restart is necessary. | |
* | |
* @var bool | |
*/ | |
protected $rebootRequired = FALSE; | |
/** | |
* Flag indicating that a job is currently being processed. | |
* | |
* @var bool | |
*/ | |
protected $processing = FALSE; | |
/** | |
* An array of all the signals for which handlers have been registered. | |
* | |
* @var array | |
*/ | |
protected $signals = array(); | |
/** | |
* State variable that tracks if an interrupt has already been sent once on | |
* this job cycle. | |
* | |
* By keeping track of this, we can allow external actors to forcibly shut | |
* interrupt a job while in process without having to resort to SIGKILL. | |
* | |
* We do not reuse $rebootRequired in order to clearly separate the expected | |
* SIGALRM case from an unexpected external signal. | |
* | |
* @var bool | |
*/ | |
protected $interrupted = FALSE; | |
/** | |
* Creates a new signal handler helper, attaching to it the set of desired | |
* signals. | |
* | |
* @param array $rebootSignals | |
* An array of signals (e.g., SIGINT, SIGTERM, SIGHUP). The only disallowed | |
* signal is SIGALRM, as waiting_queue uses it internally. | |
*/ | |
public function __construct(array $rebootSignals = array()) { | |
$this->installHandlers($rebootSignals); | |
} | |
/** | |
* Informs the signal handler that job processing is commencing. | |
* | |
* @param mixed $item | |
* The job payload that is about to be processed. There are no restrictions | |
* on payload format, though they tend to be serialize()d strings. | |
*/ | |
public function startJob($item) { | |
$this->processing = TRUE; | |
} | |
/** | |
* Informs the signal handler that job processing has finished. | |
* | |
* Calling this method does not imply success or failure of the job, only that | |
* processing has ceased. | |
*/ | |
public function finishJob() { | |
$this->processing = FALSE; | |
// Reset interrupt state, even though it shouldn't matter. | |
$this->interrupted = FALSE; | |
if (TRUE === $this->rebootRequired) { | |
$this->gracefulExit(); | |
} | |
} | |
/** | |
* Informs the signal handler that a reboot is needed at the next possible | |
* opportunity. | |
*/ | |
public function rebootRequired() { | |
$this->rebootRequired = TRUE; | |
} | |
/** | |
* Processes an incoming signal. | |
* | |
* This is the main signal handler method, used to process all signals except | |
* SIGALRM. | |
* | |
* @param integer $signal | |
* The signal sent, as an integer. | |
*/ | |
public function signalHandler($signal) { | |
if (TRUE === $this->processing) { | |
$this->rebootRequired(); | |
if (SIGINT === $signal) { | |
if (FALSE === $this->interrupted) { | |
print("A job is currently being processed; the worker will be shut down when the job is finished. Press Ctrl-C again to quit immediately without waiting for the job to complete.\n"); | |
$this->interrupted = TRUE; | |
} | |
else { | |
print("Shutting down immediately...\n"); | |
$this->gracefulExit(); | |
} | |
} | |
} | |
else { | |
$this->gracefulExit(); | |
} | |
} | |
/** | |
* Processes an incoming SIGALRM, used for intentional timed self-termination. | |
* | |
* @param integer $signal | |
* The integer value of SIGALRM (14). | |
*/ | |
public function alarmHandler($signal) { | |
if (TRUE === $this->processing) { | |
print("Alarm received, job is processing, exit soon\n"); | |
$this->rebootRequired(); | |
} | |
else { | |
print("Alarm received, exiting now\n"); | |
$this->gracefulExit(); | |
} | |
} | |
/** | |
* Terminate the current process as safely as possible. | |
*/ | |
protected function gracefulExit() { | |
// Simply exit. Child classes should put cleanup logic here, if needed. | |
exit(); | |
} | |
/** | |
* Registers signal handlers with the current PHP process. | |
* | |
* @param array $rebootSignals | |
*/ | |
protected function installHandlers($rebootSignals) { | |
$this->signals = $rebootSignals; | |
declare(ticks = 1); | |
foreach ($rebootSignals as $signal) { | |
pcntl_signal($signal, array($this, 'signalHandler')); | |
} | |
pcntl_signal(SIGALRM, array($this, 'alarmHandler')); | |
} | |
} | |
$sh = new WaitingQueueSignalHandler([SIGTERM, SIGINT]); | |
pcntl_alarm(300); | |
// loop forever performing tasks | |
$seconds = 0; | |
while (1) { | |
$sh->startJob($seconds); | |
// do something interesting here | |
usleep(4000000); | |
$seconds += 4; | |
print("This program has been running for $seconds seconds\n"); | |
$sh->finishJob(); | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
<?php | |
/** | |
* Implements hook_drush_help(). | |
*/ | |
function waiting_queue_drush_help($section) { | |
switch ($section) { | |
case 'drush:process-waiting-queue': | |
return dt('Run the named, waiting Drupal queue. Allows for processing queues that block indefinitely waiting for new jobs without holding up other queues.'); | |
} | |
} | |
/** | |
* Implements hook_drush_command(). | |
*/ | |
function waiting_queue_drush_command() { | |
$items['process-waiting-queue'] = array( | |
'callback' => 'waiting_queue_process_queue', | |
'description' => 'Run the named, waiting Drupal queue.', | |
'arguments' => array( | |
'queue_to_process' => 'The name of the queue to run.', | |
), | |
); | |
$items['test-signals'] = array( | |
'callback' => 'waiting_queue_test_signals', | |
'description' => '', | |
'bootstrap' => DRUSH_BOOTSTRAP_NONE, | |
); | |
return $items; | |
} | |
/** | |
* Gateway to either a signal or non-signal based waiting_queue implementation. | |
* | |
* Signal handling gives us a much more robust and precise queue, so we use it | |
* if it's at all available. | |
* | |
* @param string $queue_name | |
*/ | |
function waiting_queue_process_queue($queue_name) { | |
set_time_limit(0); | |
if (function_exists('pcntl_signal')) { | |
$queue_runner = \Drupal::service('waiting_queue.signal_queue_runner_service'); | |
} | |
else { | |
$queue_runner = \Drupal::service('waiting_queue.queue_runner_service'); | |
} | |
$queue_runner->processQueue($queue_name); | |
} | |
function waiting_queue_test_signals() { | |
// var_dump(pcntl_signal_get_handler(SIGINT)); | |
$sh = new WaitingQueueSignalHandler([SIGTERM, SIGINT]); | |
pcntl_alarm(300); | |
// loop forever performing tasks | |
$seconds = 0; | |
while (1) { | |
$sh->startJob($seconds); | |
// do something interesting here | |
usleep(4000000); | |
$seconds += 4; | |
print("This program has been running for $seconds seconds\n"); | |
$sh->finishJob(); | |
} | |
} | |
class WaitingQueueSignalHandler { | |
/** | |
* Flag indicating that a restart is necessary. | |
* | |
* @var bool | |
*/ | |
protected $rebootRequired = FALSE; | |
/** | |
* Flag indicating that a job is currently being processed. | |
* | |
* @var bool | |
*/ | |
protected $processing = FALSE; | |
/** | |
* An array of all the signals for which handlers have been registered. | |
* | |
* @var array | |
*/ | |
protected $signals = array(); | |
/** | |
* State variable that tracks if an interrupt has already been sent once on | |
* this job cycle. | |
* | |
* By keeping track of this, we can allow external actors to forcibly shut | |
* interrupt a job while in process without having to resort to SIGKILL. | |
* | |
* We do not reuse $rebootRequired in order to clearly separate the expected | |
* SIGALRM case from an unexpected external signal. | |
* | |
* @var bool | |
*/ | |
protected $interrupted = FALSE; | |
/** | |
* Creates a new signal handler helper, attaching to it the set of desired | |
* signals. | |
* | |
* @param array $rebootSignals | |
* An array of signals (e.g., SIGINT, SIGTERM, SIGHUP). The only disallowed | |
* signal is SIGALRM, as waiting_queue uses it internally. | |
*/ | |
public function __construct(array $rebootSignals = array()) { | |
$this->installHandlers($rebootSignals); | |
} | |
/** | |
* Informs the signal handler that job processing is commencing. | |
* | |
* @param mixed $item | |
* The job payload that is about to be processed. There are no restrictions | |
* on payload format, though they tend to be serialize()d strings. | |
*/ | |
public function startJob($item) { | |
$this->processing = TRUE; | |
} | |
/** | |
* Informs the signal handler that job processing has finished. | |
* | |
* Calling this method does not imply success or failure of the job, only that | |
* processing has ceased. | |
*/ | |
public function finishJob() { | |
$this->processing = FALSE; | |
// Reset interrupt state, even though it shouldn't matter. | |
$this->interrupted = FALSE; | |
if (TRUE === $this->rebootRequired) { | |
$this->gracefulExit(); | |
} | |
} | |
/** | |
* Informs the signal handler that a reboot is needed at the next possible | |
* opportunity. | |
*/ | |
public function rebootRequired() { | |
$this->rebootRequired = TRUE; | |
} | |
/** | |
* Processes an incoming signal. | |
* | |
* This is the main signal handler method, used to process all signals except | |
* SIGALRM. | |
* | |
* @param integer $signal | |
* The signal sent, as an integer. | |
*/ | |
public function signalHandler($signal) { | |
print("signal received!"); | |
if (TRUE === $this->processing) { | |
$this->rebootRequired(); | |
if (SIGINT === $signal) { | |
if (FALSE === $this->interrupted) { | |
print("A job is currently being processed; the worker will be shut down when the job is finished. Press Ctrl-C again to quit immediately without waiting for the job to complete.\n"); | |
$this->interrupted = TRUE; | |
} | |
else { | |
print("Shutting down immediately...\n"); | |
$this->gracefulExit(); | |
} | |
} | |
} | |
else { | |
$this->gracefulExit(); | |
} | |
} | |
/** | |
* Processes an incoming SIGALRM, used for intentional timed self-termination. | |
* | |
* @param integer $signal | |
* The integer value of SIGALRM (14). | |
*/ | |
public function alarmHandler($signal) { | |
if (TRUE === $this->processing) { | |
print("Alarm received, job is processing, exit soon\n"); | |
$this->rebootRequired(); | |
} | |
else { | |
print("Alarm received, exiting now\n"); | |
$this->gracefulExit(); | |
} | |
} | |
/** | |
* Terminate the current process as safely as possible. | |
*/ | |
protected function gracefulExit() { | |
// Simply exit. Child classes should put cleanup logic here, if needed. | |
exit(); | |
} | |
/** | |
* Registers signal handlers with the current PHP process. | |
* | |
* @param array $rebootSignals | |
*/ | |
protected function installHandlers($rebootSignals) { | |
$this->signals = $rebootSignals; | |
declare(ticks = 1); | |
foreach ($rebootSignals as $signal) { | |
pcntl_signal($signal, array($this, 'signalHandler')); | |
} | |
pcntl_signal(SIGALRM, array($this, 'alarmHandler')); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment