|
<?php |
|
|
|
namespace Application\Tasks; |
|
|
|
/** |
|
* AsyncTaskAbstract allows to fork a php process running at PHP-FPM to do AsyncTasks |
|
* (Not tested with php-cgi, and probaly will not work with Apache mod_php and php-single-worker) |
|
* @requeriment PCNTL Process Control http://php.net/manual/pt_BR/book.pcntl.php |
|
* @requeriment POSIX http://php.net/manual/pt_BR/book.posix.php |
|
* @requeriment Semaphore, Shared Memory and IPC http://php.net/manual/pt_BR/book.sem.php |
|
* @note Will not work on Windows. Use pthreads instead http://php.net/manual/pt_BR/pthreads.installation.php. |
|
* @author Leonan Carvalho <j.leonancarvalho@gmail.com> |
|
*/ |
|
abstract class AsyncTaskAbstract { |
|
|
|
const SHARED_MEMORY_SIZE = 1024; |
|
|
|
/** |
|
* Semaphore ID (shared memory) |
|
* @var int |
|
*/ |
|
private static $shmId; |
|
|
|
/** |
|
* Code line to create a hash. |
|
* @var int |
|
*/ |
|
protected static $line; |
|
|
|
/** |
|
* File Class name for hash identifier. |
|
* @var string |
|
*/ |
|
protected static $file; |
|
|
|
/** |
|
* Class Name |
|
* @var string |
|
*/ |
|
protected static $name; |
|
|
|
/** |
|
* Directory to save log files. |
|
* @var string |
|
*/ |
|
protected static $_logdir; |
|
|
|
/** |
|
* Main process PID. |
|
* @var int |
|
*/ |
|
private $var_key_pid; |
|
|
|
/** |
|
* Task status. |
|
* @var int |
|
*/ |
|
private $var_key_status; |
|
|
|
/** |
|
* Fork process PID. |
|
* @var int |
|
*/ |
|
private $var_key_fork_pid; |
|
|
|
/** |
|
* |
|
* @var string |
|
*/ |
|
private $_log = ""; |
|
|
|
/** |
|
* Creates a new Async Task. |
|
* @throws RuntimeException |
|
*/ |
|
final public function __construct() { |
|
try { |
|
gc_enable(); |
|
$error = []; |
|
if (version_compare(PHP_VERSION, '5.6.0', '<') || defined('HHVM_VERSION')) { |
|
$error[] = "AsyncTask only supports PHP 5.6.0 and above"; |
|
} |
|
if (!extension_loaded('pcntl')) { |
|
$error[] = "Extension \"pcntl\" is required."; |
|
} |
|
if (!extension_loaded('posix')) { |
|
$error[] = "Extension \"posix\" is required"; |
|
} |
|
|
|
if (count($error) > 0) { |
|
$error[] = "Please fix errors bellow to use AsynTask"; |
|
throw new RuntimeException( |
|
implode(PHP_EOL, $error) |
|
); |
|
} |
|
|
|
self::$line = mt_rand(1, 65536); |
|
|
|
$reflect = new \ReflectionClass($this); |
|
self::$file = $reflect->getFileName(); |
|
self::$name = $reflect->getName(); |
|
|
|
if (!is_dir(self::$_logdir)) { |
|
mkdir(self::$_logdir, 0777, true); |
|
} |
|
//Create a shared memory |
|
self::$shmId = shm_attach((int) (ftok(self::$file, 'A') . self::$line), self::SHARED_MEMORY_SIZE); |
|
|
|
$this->var_key_pid = $this->alocatesharedMemory(getmypid(), 112112105100); //112112105100; |
|
$this->var_key_status = $this->alocatesharedMemory('PENDING', 11511697116117115); //11511697116117115; |
|
// $this->log("MainTask" . getmypid()); |
|
register_shutdown_function(array(&$this, 'FatalErrorCatch')); |
|
} catch (\Exception $e) { |
|
$sys = new \Exception("Task Init Fails", $e->getCode(), $e); |
|
} |
|
} |
|
|
|
/** |
|
* Creates a valid number for var_key |
|
* @param mixed $value |
|
* @return int |
|
*/ |
|
final private function getVar_Key() { |
|
$key = (int) mt_rand(1, 65536); |
|
// error_log("[VarKey]".$key,4); |
|
return $key; |
|
} |
|
|
|
/** |
|
* Aloc shared memory to store informations between main process and fork. |
|
* @param mixed $value |
|
* @return int type |
|
*/ |
|
final private function alocatesharedMemory($value, $var_key = 1) { |
|
|
|
while (true) { |
|
if (!shm_has_var(self::$shmId, $var_key)) { |
|
try { |
|
shm_put_var(self::$shmId, $var_key, $value); |
|
break; |
|
} catch (\Exception $e) { |
|
error_log($e->getMessage()); |
|
break; |
|
} |
|
} else { |
|
$var_key = $this->getVar_Key(); |
|
gc_collect_cycles(); |
|
} |
|
} |
|
|
|
return $var_key; |
|
} |
|
|
|
/** |
|
* Run on both process, master and fork. |
|
* If is Main process should not clean-up the shared pointeres. |
|
*/ |
|
final public function __destruct() { |
|
//Is inside fork? |
|
if ( |
|
isset($this->var_key_fork_pid) && |
|
@shm_has_var(self::$shmId, $this->var_key_fork_pid) && |
|
shm_get_var(self::$shmId, $this->var_key_fork_pid) == getmypid() |
|
) { |
|
$this->onDestroy(true); //Trigger onDestroy |
|
$this->log("Destroy: " . getmypid()); //Record a log |
|
// Try to avoids PHP Warning: "shm_attach" failed for key No space left on device in |
|
shm_detach(self::$shmId); |
|
shm_remove(self::$shmId); |
|
gc_collect_cycles(); |
|
posix_kill(getmypid(), SIGKILL); |
|
}else{ |
|
$this->onDestroy(false); |
|
} |
|
} |
|
|
|
/** |
|
* Run the task |
|
* @param array $parameters |
|
*/ |
|
final public function run($parameters) { |
|
try { |
|
//Async Tasks may run for a long time, and can't be destroyed if user go out. |
|
ignore_user_abort(true); |
|
set_time_limit(0); |
|
|
|
// Start signal Handler |
|
pcntl_signal(SIGCHLD, SIG_IGN); |
|
$pid = pcntl_fork(); //Fork it into a new process |
|
|
|
//This code above will run in paralle on master and fork process |
|
if ($pid == -1) { |
|
throw new \RuntimeException("Could not start a fork process."); |
|
} else if ($pid) { //Master process |
|
/** |
|
Suspends execution of the current process until a child as specified by the pid argument has exited, |
|
or until a signal is delivered whose action is to terminate the current process or to call a signal handling function. |
|
If a child as requested by pid has already exited by the time of the call (a so-called "zombie" process), |
|
the function returns immediately. |
|
**/ |
|
// pcntl_waitpid($pid, $exitcode, WNOHANG); // prevents zombie processes |
|
return; |
|
} else { //Frok process |
|
ignore_user_abort(); //I dont know why but must be set again. |
|
$sid = posix_setsid(); |
|
self::$shmId = shm_attach((int) (ftok(self::$file, 'A') . self::$line), self::SHARED_MEMORY_SIZE); |
|
$this->var_key_fork_pid = $this->alocatesharedMemory($sid, 112105100); //112105100 |
|
shm_put_var(self::$shmId, $this->var_key_status, 'RUNNING'); |
|
// $this->log("SubTask" . $sid); |
|
// $this->log("SubPid" . $this->var_key_fork_pid); |
|
// $this->log("Substatus" . $this->var_key_status); |
|
$this->onPreExecute($parameters); //Used to validate things |
|
// $this->log("SessionDelay: " . session_id()); |
|
|
|
try { |
|
$result = $this->doInBackground($parameters); //Run the task |
|
} catch (\Exception $e) { |
|
$this->log("[ErrExec:" . $e->getCode() . "] " . $e->getMessage()); |
|
$this->cancel($e->getMessage()); |
|
} |
|
|
|
$this->onPostExecute($result); //Send the event. |
|
//Clean up the shared memory and kill the process. |
|
if (@shm_has_var(self::$shmId, $this->var_key_fork_pid)) { |
|
shm_put_var(self::$shmId, $this->var_key_fork_pid, null); |
|
shm_put_var(self::$shmId, $this->var_key_status, 'FINISHED'); |
|
shm_detach(self::$shmId); |
|
shm_remove(self::$shmId); |
|
gc_collect_cycles(); |
|
posix_kill(getmypid(), SIGKILL); |
|
} |
|
} |
|
} catch (\Exception $e) { |
|
$msg = "[Err:" . $e->getCode() . "] " . $e->getMessage(); |
|
$this->log($msg); |
|
self::onError($msg,$e); |
|
} |
|
} |
|
|
|
/** |
|
* Attempts to cancel execution of this task |
|
* @return boolean |
|
*/ |
|
final public function cancel($reason = "Unknow") { |
|
if ( |
|
isset($this->var_key_fork_pid) && |
|
@shm_has_var(self::$shmId, $this->var_key_fork_pid) && |
|
is_null(shm_get_var(self::$shmId, $this->var_key_fork_pid)) === false |
|
) { |
|
$this->onCancelled(); |
|
$this->log("Tread Canceled Reason: " . $reason); |
|
$pid = shm_get_var(self::$shmId, $this->var_key_fork_pid); |
|
shm_put_var(self::$shmId, $this->var_key_fork_pid, null); |
|
shm_put_var(self::$shmId, $this->var_key_status, 'CANCELED'); |
|
shm_detach(self::$shmId); |
|
shm_remove(self::$shmId); |
|
posix_kill($pid, SIGKILL); |
|
return true; |
|
} else { |
|
return false; |
|
} |
|
} |
|
|
|
/** |
|
* Returns the current status of this task |
|
* @return string |
|
*/ |
|
final public function getStatus() { |
|
return @shm_has_var(self::$shmId, $this->var_key_status) ? shm_get_var(self::$shmId, $this->var_key_status) : 'PENDING'; |
|
} |
|
|
|
/** |
|
* Returns true if this task was cancelled before it completed normally |
|
* @return boolean |
|
*/ |
|
final public function isCancelled() { |
|
return @shm_has_var(self::$shmId, $this->var_key_status) == 'CANCELED' ? true : false; |
|
} |
|
|
|
/** |
|
* Runs on the thread before doInBackground($parameters) |
|
* @return void |
|
*/ |
|
protected function onPreExecute($parameters) { |
|
|
|
} |
|
|
|
/** |
|
* Override this method to perform a computation on a background thread |
|
* @param mixed $parameters |
|
* @return mixed |
|
*/ |
|
abstract protected function doInBackground($parameters); |
|
|
|
/** |
|
* Runs on the thread after doInBackground($parameters) |
|
* @param mixed $result |
|
*/ |
|
protected function onPostExecute($result) { |
|
|
|
} |
|
|
|
/** |
|
* Runs on the thread after cancel() |
|
* @return void |
|
*/ |
|
protected function onCancelled() { |
|
|
|
} |
|
|
|
/** |
|
* Runs on the thread before destroy class instance |
|
* @param bool $isfork Defin if its inside thread fork or not. |
|
*/ |
|
protected function onDestroy($isfork) { |
|
|
|
} |
|
|
|
|
|
/** |
|
* Runs on the thread error |
|
* @param string $msg |
|
* @param \Exception $exception |
|
*/ |
|
protected function onError($msg, $exception) { |
|
|
|
} |
|
final public function getFullLog(){ |
|
return $this->_log; |
|
} |
|
final public function log($msg) { |
|
$name = str_replace(['Application\Tasks\Async\\'], [''], self::$name); |
|
$filename = self::$_logdir . $name . ".log"; |
|
$date = date("Y-m-d H:i:s"); |
|
$pid = getmypid(); |
|
$msg = "[{$date}(Task {$name}:{$pid})]{$msg}" . PHP_EOL; |
|
$this->_log .= $msg; |
|
error_log($msg, 3, $filename); |
|
} |
|
|
|
final public static function FatalErrorCatch() { |
|
try { |
|
$error = error_get_last(); |
|
$byPass = array(E_RECOVERABLE_ERROR, |
|
E_WARNING, |
|
E_PARSE, |
|
E_NOTICE, |
|
E_STRICT, |
|
E_DEPRECATED); |
|
|
|
if ($error !== NULL && is_array($error) && array_key_exists('type', $error) && !in_array($error['type'], $byPass)) { |
|
$pid = getmypid(); |
|
$date = date(DATE_ATOM); |
|
$msg = "[$date][Thread Error]: " . $error["message"] . " in " . $error["file"] . "(" . $error["line"] . ")" . PHP_EOL; |
|
$trace = array_reverse(debug_backtrace()); |
|
array_pop($trace); |
|
foreach ($trace as $item) { |
|
$msg .= (isset($item['file']) ? $item['file'] : '<unknown file>') . ' ' . (isset($item['line']) ? $item['line'] : '<unknown line>') . ' calling ' . $item['function'] . '()' . PHP_EOL; |
|
} |
|
error_log($msg, 3, self::$_logdir . "{$pid}_error.log"); |
|
$sys = new \Exception($msg, 500, null); |
|
self::onError($msg, $sys); |
|
} |
|
} catch (\Exception $e) { |
|
$date = date(DATE_ATOM); |
|
error_log("[$date][Thread Fatal Error]" . $e->getMessage() . " in " . $e->getFile() . "[" . $e->getLine() . "]" . PHP_EOL . $e->getTraceAsString(), 3, self::$_logdir . "error.log"); |
|
} |
|
} |
|
|
|
} |