Skip to content

Instantly share code, notes, and snippets.

@shupp
Created May 7, 2011 23:46
Show Gist options
  • Save shupp/960974 to your computer and use it in GitHub Desktop.
Save shupp/960974 to your computer and use it in GitHub Desktop.
Kestrel Consumer
<?php
/**
* Enterprise queue consumer interface, called by bin/consumer_cli.php
*
* @author Bill Shupp <hostmaster@shupp.org>
* @copyright 2010-2011 Empower Campaigns
*/
class EC_Consumer
{
/**
* Instance of {@link Zend_Console_Getopt}
*
* @var Zend_Console_Getopt
*/
protected $_opt = null;
/**
* Which APPLICATION_ENV to run under (see -e)
*
* @var string
*/
protected $_environment = null;
/**
* The kestrel server IP
*
* @var string
*/
protected $_host = null;
/**
* The kestrel server port
*
* @var int
*/
protected $_port = null;
/**
* The kestrel queue name to connect to
*
* @var string
*/
protected $_queue = null;
/**
* Whether we should show debug output
*
* @var bool
*/
protected $_debug = false;
/**
* Maximum # of jobs for this process to perform (for memory fail safe)
*
* @var int
*/
protected $_maxJobs = null;
/**
* Current job count
*
* @var int
*/
protected $_jobCount = 0;
/**
* Parses arguments from the command line and does error handling
*
* @param array $argv The $argv from bin/ecli.php
*
* @throw Zend_Console_Getopt_Exception on failure
* @return void
*/
public function __construct(array $argv)
{
try {
$opt = new Zend_Console_Getopt(
array(
'environment|e=s' => 'environment name (e.g. development)'
. ', required',
'server|s=s' => 'kestrel server, format of host:port'
. ', required',
'queue|q=s' => 'queue name (e.g. crawler_campaign)'
. ', required',
'max-jobs|m=s' => 'max jobs to run before exiting'
. ', optional',
'debug|d' => 'show debug output'
. ', optional',
)
);
$opt->setArguments($argv);
$opt->parse();
// Set environment
if ($opt->e === null) {
throw new Zend_Console_Getopt_Exception(
'Error: missing environment'
);
}
$this->_environment = $opt->e;
// @codeCoverageIgnoreStart
if (!defined('APPLICATION_ENV')) {
define('APPLICATION_ENV', $this->_environment);
}
// @codeCoverageIgnoreEnd
// Set server
if ($opt->s === null) {
throw new Zend_Console_Getopt_Exception(
'Error: missing server'
);
}
$parts = explode(':', $opt->s);
if (count($parts) !== 2) {
throw new Zend_Console_Getopt_Exception(
'Error: invalid server: ' . $opt->s
);
}
$this->_host = $parts[0];
$this->_port = $parts[1];
// Set queue
if ($opt->q === null) {
throw new Zend_Console_Getopt_Exception(
'Error: missing queue'
);
}
$this->_queue = $opt->q;
// Set max-jobs
if ($opt->m !== null) {
$this->_maxJobs = $opt->m;
}
// Set debug
if ($opt->d !== null) {
$this->_debug = true;
}
} catch (Zend_Console_Getopt_Exception $e) {
echo "\n" . $e->getMessage() . "\n\n";
echo $opt->getUsageMessage();
// @codeCoverageIgnoreStart
if (!defined('APPLICATION_ENV') || APPLICATION_ENV !== 'testing') {
exit(1);
}
// @codeCoverageIgnoreEnd
}
$this->_opt = $opt;
}
/**
* Polls the queue server for jobs and runs them as they come in
*
* @return void
*/
public function run()
{
$client = $this->_getKestrelClient();
$queue = 'enterprise_' . $this->_queue;
while ($this->_keepRunning()) {
// Pull job from queue
$job = $client->reliableRead($queue, 500);
if ($job === false) {
$this->_debug('Nothing on queue ' . $queue);
continue;
}
if (!isset($job['instance'])) {
echo 'Instance not set in queue job: ' . print_r($job, true);
continue;
}
$instance = $job['instance'];
if (!isset($job['jobName'])) {
echo 'Job name not set in queue job: ' . print_r($job, true);
continue;
}
$jobName = $job['jobName'];
$data = null;
if (isset($job['data'])) {
$data = $job['data'];
}
// Run the job
$returnCode = $this->runJob($instance, $jobName, $data);
if ($returnCode !== 0) {
$client->abortReliableRead($queue);
continue;
}
}
$client->closeReliableRead($queue);
}
/**
* Runs the job via bin/ecli.php
*
* @param string $instance The instance name to run the job under
* @param string $jobName The job name
* @param string $data Optional extra data
*
* @return int
*/
public function runJob($instance, $jobName, $data)
{
$cmd = BASE_PATH . '/bin/ecli.php '
. '-e ' . $this->_environment
. ' -i ' . $instance
. ' -j ' . $jobName;
if ($data) {
$cmd .= " '" . base64_encode(json_encode($data)) . "'";
}
$returnCode = $this->_passthru($cmd);
$this->_jobCount++;
$this->_debug('Job count: ' . $this->_jobCount);
return $returnCode;
}
/**
* Check to see if the job limit has been reached
*
* @return bool
*/
protected function _keepRunning()
{
return ($this->_maxJobs === null) ? true
: ($this->_jobCount < $this->_maxJobs);
}
/**
* Show debug messages
*
* @param mixed $message
*
* @return void
*/
protected function _debug($message)
{
if (!$this->_debug) {
return;
}
echo $message . "\n";
}
// @codeCoverageIgnoreStart
/**
* Calls the passthru() function and returns the exit code. Abstracted
* for testing.
*
* @param string $cmd The command to execute
*
* @return int
*/
protected function _passthru($cmd)
{
passthru($cmd, $returnCode);
return $returnCode;
}
/**
* Gets a single instance of EC_KestrelClient. Abstracted for testing.
*
* @return void
*/
protected function _getKestrelClient()
{
if (APPLICATION_ENV === 'testing') {
throw new Exception(__METHOD__ . ' was not mocked when testing');
}
return new EC_KestrelClient($this->_host, $this->_port);
}
// @codeCoverageIgnoreEnd
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment