Skip to content

Instantly share code, notes, and snippets.

@josue
Last active October 13, 2015 02:26
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save josue/ae7a017bbd4e92671007 to your computer and use it in GitHub Desktop.
Save josue/ae7a017bbd4e92671007 to your computer and use it in GitHub Desktop.
Test Jobs - Send Resque Jobs to Redis
<?php
// check required Redis module
if (!class_exists('Redis')) {
throw new Exception("Requires PHP Redis extension. Install via: sudo apt-get install php5-redis");
}
class TestJobs {
private $redisInstance = null;
private $redisConfig = array();
private $options = array();
private $defaultRedisConfig = array(
'host' => 'localhost',
'port' => 6379,
'db' => 0
);
public function __construct($opts = array()) {
$this->options = $opts;
$this->redisConfig = array_merge($this->defaultRedisConfig, $this->options);
if (!empty($this->options['autorun'])) {
$this->autoRunJobs();
}
}
private function connectRedis() {
$host_port = $this->redisConfig['host'].':'.$this->redisConfig['port'];
$redis_destination = sprintf("Redis: %s", $host_port.':'.$this->redisConfig['db']);
try {
$this->redisInstance = new Redis();
$this->redisInstance->connect($host_port);
$this->redisInstance->select($this->redisConfig['db']);
} catch (Exception $e) {
$this->log(__METHOD__.' -> Caught exception: '.$e->getMessage());
throw new Exception('Unable to connect to '.$redis_destination);
}
}
public function enqueue($queue = null, $class = null, $args = array()) {
// decode json string to array
if (!empty($args) && is_string($args)) {
if (!empty($this->options['base64'])) {
$args = base64_decode($args);
}
$args = json_decode($args, true);
}
$this->job_data = array(
'queue' => $queue,
'class' => $class,
'args' => $args
);
$this->validateJobData();
return $this;
}
public function runJobs($stop_at_count = 0, $show_log = false) {
$this->validateJobData();
$this->log(sprintf("Redis: %s", $this->redisConfig['host'].':'.$this->redisConfig['port'].':'.$this->redisConfig['db']));
$this->connectRedis();
$alive = true;
$count = 0;
$stop_at_count = (is_numeric($stop_at_count) && $stop_at_count > 0 ? $stop_at_count : 100);
$show_log = (bool) $show_log;
$resque_queue = 'resque:queue:'.$this->job_data['queue'];
$job_data = $this->job_data;
$has_args = (!empty($this->job_data['args']) ? true : false);
while($alive) {
$count++;
$job_data['enqueued_ts'] = microtime(true);
if ($has_args == true) {
$job_data['args'] = $this->job_data['args'];
} else {
$job_data['args'] = array(
'_ts' => time(),
'rand_seed' => $count.'.'.time().'.'.rand(1, 100000),
'filename' => sprintf('sample_%s.%s', $count, ($count % 2 ? 'jpg' : 'pdf')),
);
}
$job_data['args'] = array( $job_data['args'] );
$job_data['id'] = md5($job_data['class'].json_encode($job_data['args']));
$job_data_encoded = json_encode($job_data);
$this->redisInstance->rpush($resque_queue, $job_data_encoded);
if ($show_log === true) {
$this->log(sprintf('(%s) Queue: %s', $count, $job_data_encoded));
}
if ($stop_at_count == $count) {
$alive = false;
}
}
$this->log(sprintf("Enqueued: %s", $count));
}
private function validateJobData() {
if (empty($this->job_data['queue']) || empty($this->job_data['class'])) {
throw new InvalidArgumentException('Job is missing args: queue or class values');
}
}
public function autoRunJobs() {
$this->options = self::extractOptions();
$this->redisConfig = array_merge($this->defaultRedisConfig, $this->options);
$this->log(__METHOD__.' = Yes');
$this->enqueue($this->options['queue'], $this->options['class'], @$this->options['args']);
$this->runJobs(@$this->options['jobs'], @$this->options['show_log']);
}
public static function extractOptions() {
$options = array();
// run via CLI
if (php_sapi_name() === 'cli') {
$options = getopt("",array(
'host:', // --host=localhost
'port:', // --port=6379
'db:', // --db=3
'class:', // --class=process_queue_add_assets
'queue:', // --queue=Queue_Add_Asset
'args:', // --args={json}
'jobs:', // --jobs=10
'show_log:', // --show_log=true
'autorun:', // --autorun=1
'base64:', // --base64=1
));
$options['interface'] = 'cli';
}
// run via browser
else if (!empty($_GET['queue'])) {
$options = $_GET;
$options['interface'] = 'browser';
}
return $options;
}
public function log($data = null) {
$is_browser = !empty($this->options['interface']) && $this->options['interface'] == 'browser' ? true : false;
print sprintf('%s%s', $data, ($is_browser ? '<br>' : '')).PHP_EOL;
}
}
/**
* Usage:
*
* via CLI: php test_jobs.php --queue=logger_job --class=LoggerJob --args='{ "data": { "josue": 1 } }' --jobs=500 --db=3 --host=127.0.0.1
*
* via browser: /test_jobs.php?host=127.0.0.1&db=3&queue=logger_job&class=LoggerJob&jobs=500
*/
// start the jobs
//new TestJobs(array('autorun'=>1));
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment