Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
use anonymous function and an array of args to create a series of sub process to do the job,no need to care about process management api.
<?php
/**
* @example
*
* $a = new ProcessExecutor();
* $b = function ($arg){ sleep(1);echo "hello $arg\n"; };
* $a->setRunnables($b,['aaa','bbb']);
* $a->setCheckStatus(function($arg){return $arg === 'aaa';});
* $a->execute();
* var_dump($a->getRunStatusArr());
* var_dump($a->getRunTimes());
*/
class ProcessExecutor {
const FORK_INTERVAL = 1;//s,两次fork的间隔
const STATUS_SUCCESS = 1;
const STATUS_FAIL = 0;
private $maxTimes = 3;
private $runnables = [];
private $args = [];
private $processes = [];
private $runTimes = [];
private $runStatusArr = [];
private $retryInterval = 0;
private $checkStatus;
private $onSuccess;
private $onFail;
/**
* drupal中需要:
* \Database::closeConnection(); // 防止子进程共享数据库连接导致fatalError
* \Redis_Client::reset(); // 防止子进程共享父进程的redis连接导致protocol error
*/
public function prepareFork() { }
private function forkRun($runnable) {
$this->prepareFork();
$hash = spl_object_hash($runnable);
$pid = pcntl_fork();
if ($pid == -1) {
die('could not fork');
}
elseif (!$pid) { // child
call_user_func($runnable, $this->args[$hash]);
exit();
}
else { // parent
$this->processes[$pid] = $hash;
$this->runTimes[$hash]++;
sleep(self::FORK_INTERVAL); // 两次fork最好有间隔
}
return $pid;
}
public function execute() {
// 初始化所有需要跑的进程
foreach ( $this->runnables as $runnable ) {
$pid = $this->forkRun($runnable);
}
$success = $fail = 0;
// parent
while ( $success + $fail < count($this->runnables) ) { // 判断为成功或判断为失败的任务达到总任务数
$pid = pcntl_wait($status, WUNTRACED);
$hash = $this->processes[$pid];
$runnable = $this->runnables[$hash];
$arg = $this->args[$hash];
$result = empty($this->checkStatus) ? true : call_user_func($this->checkStatus, $arg);
if ($result === true) {
$this->runStatusArr[$hash] = self::STATUS_SUCCESS;
$success++;
if(!empty($this->onSuccess)){
call_user_func($this->onSuccess,$arg);
}
}
elseif ($this->runTimes[spl_object_hash($runnable)] >= $this->maxTimes && $result === false) {
$fail++;
$this->runStatusArr[$hash] = self::STATUS_FAIL;
}
else {
if(!empty($this->onFail)){
call_user_func($this->onFail,$arg);
}
sleep($this->retryInterval);
$new_pid = $this->forkRun($runnable);
}
}
}
public function getMaxTimes() {
return $this->maxTimes;
}
public function getRetryInterval() {
return $this->retryInterval;
}
public function setMaxTimes($maxTimes) {
$this->maxTimes = $maxTimes;
}
public function setRunnables($runnable, $args) {
if (!is_callable($runnable)) {
throw new Exception('not callable');
}
foreach ( $args as $key => $arg ) {
$runner = new RunnableWrapper($runnable);
$hash = spl_object_hash($runner);
$this->runnables[$hash] = $runner;
$this->runTimes[$hash] = 0;
$this->args[$hash] = $arg;
}
}
public function setOnFail($onFail) {
$this->onFail = $onFail;
}
public function setOnSuccess($onSuccess) {
$this->onSuccess = $onSuccess;
}
public function setRetryInterval($retryInterval) {
$this->retryInterval = $retryInterval;
}
public function setCheckStatus($checkStatus) {
if(!is_callable($checkStatus)){
throw new Exception("check status not callable");
}
$this->checkStatus = $checkStatus;
}
public function getRunStatusArr() {
return $this->runStatusArr;
}
public function getRunTimes(){
return $this->runTimes;
}
}
class RunnableWrapper {
private $callable;
public function __construct($callable) {
$this->callable = $callable;
}
public function __invoke($args) {
call_user_func($this->callable, $args);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment