Skip to content

Instantly share code, notes, and snippets.

@shendongming
Created October 22, 2013 10:31
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save shendongming/7098369 to your computer and use it in GitHub Desktop.
Save shendongming/7098369 to your computer and use it in GitHub Desktop.
<?
/**
* 优先级任务管理
* Class Muti_Task
*/
class Muti_Task {
public $queue = array();
private $status = 0;
private $time_min = 10000;
private $time_max = 0;
private $callbacks = array();
//每次调度一个
private $calls = array();
private $fun_info = array(); //未调度
private $fun_id_counter = 0; //10s
private $_schedule_ids = array(); //10s
//id计数器
public function schedule() {
while(true) {
$time = $this->getTime();
if(empty($this->_schedule_ids)) {
break;
}
//按照时刻排序
ksort($this->_schedule_ids);
$key_time = key($this->_schedule_ids);
if($key_time > $time) {
//时间未到,休息指定的时刻
usleep(($key_time - $time) * 1000);
$time = $this->getTime();
}
$ids = $this->_schedule_ids[$key_time];
//删除 没有用的 定时器
unset($this->_schedule_ids[$key_time]);
if(!$ids) {
//当前 定时器 为空
continue;
}
if($ids) {
foreach($ids as $id) {
$info = $this->fun_info[$id];
//执行调度,让这些回调函数生成任务
try {
$this->fun_info[$id]['last_time'] = $time;
//$t1=microtime(1);
call_user_func_array($info['fun'], array($this));
//$t2=microtime(1);
}
catch(\Exception $e) {
print_r($e);
}
$add_pos = $this->addschedule($info);
$is_stop = 0;
//执行调度后,会生成多个任务,获取任务执行,需要把这个全部执行完
if($this->queue) {
while(count($this->queue)) {
$task = array_shift($this->queue);
//debug
try {
//echo "time:" . date('Y-m-d H:i:s') . "\n";
call_user_func_array($task[0], $task[1]);
}
catch(\Muti_Task_StopTaskException $e) {
$is_stop = 1;
}
catch(\Muti_Task_StopException $e) {
return false;
}
catch(Exception $e) {
print_r($e);
}
}
}
//停止当前任务
if($is_stop) {
unset($this->_schedule_ids[$add_pos[0]][$add_pos[1]]);
if(empty($this->_schedule_ids[$add_pos[0]])) {
unset($this->_schedule_ids[$add_pos[0]]);
}
}
}
}
}
}
/**
* 获取毫秒
* @return int
*/
public function getTime() {
return (int)(microtime(1) * 1000);
}
/**
* 这个某个对象加入调度队列
* @param $info
*/
private function addschedule($info) {
$id = $info['id'];
$time = $this->getTime() + $info['time'];
$pos = 0;
if(!isset($this->_schedule_ids[$time])) {
$this->_schedule_ids[$time] = array($id);
} else {
$pos = count($this->_schedule_ids[$time]);
$this->_schedule_ids[$time][$pos] = $id;
}
return array($time, $pos);
}
/**
* 将某个任务加入队列排队执行
* @param $task
*/
public function add($task, $param = array()) {
$this->queue[] = array($task, $param);
}
/**
* @param $fun 回调函数
* @param $time 时间间隔
*/
public function setInterval($fun, $time, $params = array()) {
$id = ++$this->fun_id_counter;
$this->fun_info[$id] = array('id' => $id, 'fun' => $fun, 'last_time' => -1, 'count' => 0, 'time' => $time);
$this->addschedule($this->fun_info[$id]);;
}
}
/**
* 停止所有
* Class Muti_Task_StopException
*/
class Muti_Task_StopException extends Exception {
}
/**
* 停止当前定时器
* Class Muti_Task_StopTaskException
*/
class Muti_Task_StopTaskException extends Exception {
}
class Muti_Task_Test {
/**
* 测试代码
*/
public function testrun() {
//test
$task = new Muti_Task();
//每隔1.2s
$task->setInterval(function ($task) {
$task->add(function () {
echo "tast3 run\n";
});
}, 1200);
$task->setInterval(function ($task) {
$task->add(function () {
echo "tast4 run\n";
sleep(3);
});
}, 2000);
//每隔1.2s 执行一次队列1
$task->setInterval(function ($task) {
$task->add(function () {
echo "tast1 run\n";
}, array('a', 'b', 'c'));
}, 4000);
$task->setInterval(function ($task) {
$task->add(function () {
echo "tast2 run\n";
throw new Muti_Task_StopException();
});
}, 8000);
$task->schedule();
}
}
//$a= new Muti_Task_Test();
//$a->testrun();
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment