Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
Both files go into custom/include/SugarQueue, then run a Repair to regenerate cache/file_map.php and you're good to go.
<?php
if (!defined('sugarEntry') || !sugarEntry) {
die('Not A Valid Entry Point');
}
require_once 'include/SugarQueue/SugarCronJobs.php';
require_once 'custom/include/SugarQueue/SugarJobFilteredQueue.php';
// Sample configuration:
//
// $sugar_config['cron_class'] = 'SugarCronFilteredJobs';
// $sugar_config['cron']['filters'] = array(
// 'marketo' => array(
// 'max_cron_jobs' => 1, // Jobs processed per run
// 'max_cron_runtime' => 3600, // value in seconds, 60 minutes
// 'min_cron_interval' => 0,
// 'enforce_runtime' => true,
// 'targets' => array(
// 'class::SugarJobUpdateMarketo',
// 'function::marketoSync',
// 'function::marketoActivityLogSync',
// 'class::SugarJobGetMarketoActivities'
// ),
// ),
// 'fast' => array(
// 'max_cron_jobs' => 50,
// 'targets' => array(
// 'function::PMSEJobRun',
// 'class::SugarJobUpdateForecastWorksheets',
// 'class::\Sugarcrm\Sugarcrm\Elasticsearch\Queue\Scheduler',
// 'function::processQueue',
// 'function::processWorkflow'
// ),
// ),
// 'default' => array(
// 'max_cron_jobs' => 10, 'max_cron_runtime' => 600, 'enforce_runtime' => true,
// 'targets' => array() // Process first come first serve, any target not listed in another filter
// ),
// );
class SugarCronFilteredJobs extends SugarCronJobs {
// Should we dump out debug information to the cron log?
const CONSOLE_DEBUG = true;
// Which INI setting are we using to pass the named filter information? Default: user_agent
const INI_STRING = 'user_agent';
// Which defined filter have we been created to handle
protected $filter;
// Disable them by default, otherwise multiple crons will put duplicates into the queue
public $disable_schedulers = true;
public function __construct() {
global $sugar_config;
// Usage: php -d user_agent=<filter> -f cron.php
$this->filter = ini_get(self::INI_STRING) ?: 'default';
if (is_array($sugar_config['cron']['filters'][$this->filter])) {
$filterConfig = $sugar_config['cron']['filters'][$this->filter];
if (isset($filterConfig['max_cron_jobs'])) {
$this->max_jobs = $filterConfig['max_cron_jobs'];
}
if (isset($filterConfig['max_cron_runtime'])) {
$this->max_runtime = $filterConfig['max_cron_runtime'];
}
if (isset($filterConfig['min_cron_interval'])) {
$this->min_interval = $filterConfig['min_cron_interval'];
}
if (isset($filterConfig['enforce_runtime'])) {
$this->enforceHardLimit = $filterConfig['enforce_runtime'];
}
$this->lockfile = sugar_cached("modules/Schedulers/lastrun_" . $this->filter);
$this->queue = new SugarJobFilteredQueue();
$this->queue->targets = $filterConfig['targets'] ?: array();
// If we're passing an empty target array, we need to exclude all the other named targets
if (empty($filterConfig['targets'])) {
// Enable scheduler processing for the 'default' bucket
$this->disable_schedulers = false;
foreach ($sugar_config['cron']['filters'] as $f => $c) {
if ($f != $this->filter) {
$this->queue->addExcludeTargets($c['targets']);
}
}
}
if (self::CONSOLE_DEBUG) {
printf("%s CRON driver SugarCronFilteredJobs with filter: %s (max %d)" . PHP_EOL,
strftime('%c'), $this->filter, $this->max_jobs);
}
} else {
// If we haven't configured our filter, just process the queue as normal
// We could just assume 'default' here, but why bother? Configure it.
if (self::CONSOLE_DEBUG) {
print(strftime('%c') . " SugarCronFilteredJobs missing configuration! Running as normal." . PHP_EOL);
}
parent::__construct();
}
}
/**
* Return ID for this client
* @return string
*/
public function getMyId() {
return 'CRON' . $GLOBALS['sugar_config']['unique_key'] . ':' . $this->filter . ':' . getmypid();
}
}
<?php
if (!defined('sugarEntry') || !sugarEntry) {
die('Not A Valid Entry Point');
}
require_once 'include/SugarQueue/SugarJobQueue.php';
class SugarJobFilteredQueue extends SugarJobQueue {
// Should we dump out debug information to the cron log?
const CONSOLE_DEBUG = false;
// An array of job targets we're going to look for
public $targets = array();
// Include a lust of targets we don't want to process, as they're
// handed by other filtersets
public $excludeTargets = array();
/**
* Fetch the next job in the queue and mark it running
* @param string $clientID ID of the client requesting the job
* @return SugarJob
*/
public function nextJob($clientID) {
$now = $this->db->now();
$queued = SchedulersJob::JOB_STATUS_QUEUED;
$try = $this->jobTries;
$extra = ' ';
while ($try--) {
if (count($this->targets)) {
$extra .= "AND target IN ('" . implode("','", $this->targets) . "') ";
}
if (count($this->excludeTargets)) {
$extra .= "AND target NOT IN ('" . implode("','", $this->excludeTargets) . "') ";
}
$sql = sprintf("SELECT id FROM %s WHERE execute_time <= %s AND status = '%s'%sAND deleted = 0 ORDER BY execute_time ASC",
$this->job_queue_table, $now, $queued, $extra);
if (self::CONSOLE_DEBUG) {
print(strftime('%c') . " > Filter SQL: {$sql}") . PHP_EOL;
}
$id = $this->db->getOne($sql);
if (empty($id)) {
return null;
}
$job = BeanFactory::getBean('SchedulersJobs', $id);
if (empty($job->id)) {
return null;
}
if (self::CONSOLE_DEBUG) {
print(strftime('%c') . " > Found {$job->name}, trying to run ...") . PHP_EOL;
}
$job->status = SchedulersJob::JOB_STATUS_RUNNING;
$job->client = $clientID;
$client = $this->db->quote($clientID);
$res = $this->db->query("UPDATE {$this->job_queue_table} SET status='{$job->status}', date_modified=$now, client='$client' WHERE id='{$job->id}' AND status='$queued'");
if ($this->db->getAffectedRowCount($res) == 0) {
// somebody stole our job, try again
continue;
} else {
$job->save();
break;
}
}
return $job;
}
public function addExcludeTargets($array) {
$this->excludeTargets = array_merge($this->excludeTargets, $array);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.