Skip to content

Instantly share code, notes, and snippets.

@LeonanCarvalho
Last active May 24, 2021 21:02
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save LeonanCarvalho/62c6fe0b62db8a478f502f84c5734c83 to your computer and use it in GitHub Desktop.
Save LeonanCarvalho/62c6fe0b62db8a478f502f84c5734c83 to your computer and use it in GitHub Desktop.
PHP Threads
<?php
namespace Application\Tasks;
/**
* AsyncTaskAbstract allows to fork a php process running at PHP-FPM to do AsyncTasks
* (Not tested with php-cgi, and probaly will not work with Apache mod_php and php-single-worker)
* @requeriment PCNTL Process Control http://php.net/manual/pt_BR/book.pcntl.php
* @requeriment POSIX http://php.net/manual/pt_BR/book.posix.php
* @requeriment Semaphore, Shared Memory and IPC http://php.net/manual/pt_BR/book.sem.php
* @note Will not work on Windows. Use pthreads instead http://php.net/manual/pt_BR/pthreads.installation.php.
* @author Leonan Carvalho <j.leonancarvalho@gmail.com>
*/
abstract class AsyncTaskAbstract {
const SHARED_MEMORY_SIZE = 1024;
/**
* Semaphore ID (shared memory)
* @var int
*/
private static $shmId;
/**
* Code line to create a hash.
* @var int
*/
protected static $line;
/**
* File Class name for hash identifier.
* @var string
*/
protected static $file;
/**
* Class Name
* @var string
*/
protected static $name;
/**
* Directory to save log files.
* @var string
*/
protected static $_logdir;
/**
* Main process PID.
* @var int
*/
private $var_key_pid;
/**
* Task status.
* @var int
*/
private $var_key_status;
/**
* Fork process PID.
* @var int
*/
private $var_key_fork_pid;
/**
*
* @var string
*/
private $_log = "";
/**
* Creates a new Async Task.
* @throws RuntimeException
*/
final public function __construct() {
try {
gc_enable();
$error = [];
if (version_compare(PHP_VERSION, '5.6.0', '<') || defined('HHVM_VERSION')) {
$error[] = "AsyncTask only supports PHP 5.6.0 and above";
}
if (!extension_loaded('pcntl')) {
$error[] = "Extension \"pcntl\" is required.";
}
if (!extension_loaded('posix')) {
$error[] = "Extension \"posix\" is required";
}
if (count($error) > 0) {
$error[] = "Please fix errors bellow to use AsynTask";
throw new RuntimeException(
implode(PHP_EOL, $error)
);
}
self::$line = mt_rand(1, 65536);
$reflect = new \ReflectionClass($this);
self::$file = $reflect->getFileName();
self::$name = $reflect->getName();
if (!is_dir(self::$_logdir)) {
mkdir(self::$_logdir, 0777, true);
}
//Create a shared memory
self::$shmId = shm_attach((int) (ftok(self::$file, 'A') . self::$line), self::SHARED_MEMORY_SIZE);
$this->var_key_pid = $this->alocatesharedMemory(getmypid(), 112112105100); //112112105100;
$this->var_key_status = $this->alocatesharedMemory('PENDING', 11511697116117115); //11511697116117115;
// $this->log("MainTask" . getmypid());
register_shutdown_function(array(&$this, 'FatalErrorCatch'));
} catch (\Exception $e) {
$sys = new \Exception("Task Init Fails", $e->getCode(), $e);
}
}
/**
* Creates a valid number for var_key
* @param mixed $value
* @return int
*/
final private function getVar_Key() {
$key = (int) mt_rand(1, 65536);
// error_log("[VarKey]".$key,4);
return $key;
}
/**
* Aloc shared memory to store informations between main process and fork.
* @param mixed $value
* @return int type
*/
final private function alocatesharedMemory($value, $var_key = 1) {
while (true) {
if (!shm_has_var(self::$shmId, $var_key)) {
try {
shm_put_var(self::$shmId, $var_key, $value);
break;
} catch (\Exception $e) {
error_log($e->getMessage());
break;
}
} else {
$var_key = $this->getVar_Key();
gc_collect_cycles();
}
}
return $var_key;
}
/**
* Run on both process, master and fork.
* If is Main process should not clean-up the shared pointeres.
*/
final public function __destruct() {
//Is inside fork?
if (
isset($this->var_key_fork_pid) &&
@shm_has_var(self::$shmId, $this->var_key_fork_pid) &&
shm_get_var(self::$shmId, $this->var_key_fork_pid) == getmypid()
) {
$this->onDestroy(true); //Trigger onDestroy
$this->log("Destroy: " . getmypid()); //Record a log
// Try to avoids PHP Warning: "shm_attach" failed for key No space left on device in
shm_detach(self::$shmId);
shm_remove(self::$shmId);
gc_collect_cycles();
posix_kill(getmypid(), SIGKILL);
}else{
$this->onDestroy(false);
}
}
/**
* Run the task
* @param array $parameters
*/
final public function run($parameters) {
try {
//Async Tasks may run for a long time, and can't be destroyed if user go out.
ignore_user_abort(true);
set_time_limit(0);
// Start signal Handler
pcntl_signal(SIGCHLD, SIG_IGN);
$pid = pcntl_fork(); //Fork it into a new process
//This code above will run in paralle on master and fork process
if ($pid == -1) {
throw new \RuntimeException("Could not start a fork process.");
} else if ($pid) { //Master process
/**
Suspends execution of the current process until a child as specified by the pid argument has exited,
or until a signal is delivered whose action is to terminate the current process or to call a signal handling function.
If a child as requested by pid has already exited by the time of the call (a so-called "zombie" process),
the function returns immediately.
**/
// pcntl_waitpid($pid, $exitcode, WNOHANG); // prevents zombie processes
return;
} else { //Frok process
ignore_user_abort(); //I dont know why but must be set again.
$sid = posix_setsid();
self::$shmId = shm_attach((int) (ftok(self::$file, 'A') . self::$line), self::SHARED_MEMORY_SIZE);
$this->var_key_fork_pid = $this->alocatesharedMemory($sid, 112105100); //112105100
shm_put_var(self::$shmId, $this->var_key_status, 'RUNNING');
// $this->log("SubTask" . $sid);
// $this->log("SubPid" . $this->var_key_fork_pid);
// $this->log("Substatus" . $this->var_key_status);
$this->onPreExecute($parameters); //Used to validate things
// $this->log("SessionDelay: " . session_id());
try {
$result = $this->doInBackground($parameters); //Run the task
} catch (\Exception $e) {
$this->log("[ErrExec:" . $e->getCode() . "] " . $e->getMessage());
$this->cancel($e->getMessage());
}
$this->onPostExecute($result); //Send the event.
//Clean up the shared memory and kill the process.
if (@shm_has_var(self::$shmId, $this->var_key_fork_pid)) {
shm_put_var(self::$shmId, $this->var_key_fork_pid, null);
shm_put_var(self::$shmId, $this->var_key_status, 'FINISHED');
shm_detach(self::$shmId);
shm_remove(self::$shmId);
gc_collect_cycles();
posix_kill(getmypid(), SIGKILL);
}
}
} catch (\Exception $e) {
$msg = "[Err:" . $e->getCode() . "] " . $e->getMessage();
$this->log($msg);
self::onError($msg,$e);
}
}
/**
* Attempts to cancel execution of this task
* @return boolean
*/
final public function cancel($reason = "Unknow") {
if (
isset($this->var_key_fork_pid) &&
@shm_has_var(self::$shmId, $this->var_key_fork_pid) &&
is_null(shm_get_var(self::$shmId, $this->var_key_fork_pid)) === false
) {
$this->onCancelled();
$this->log("Tread Canceled Reason: " . $reason);
$pid = shm_get_var(self::$shmId, $this->var_key_fork_pid);
shm_put_var(self::$shmId, $this->var_key_fork_pid, null);
shm_put_var(self::$shmId, $this->var_key_status, 'CANCELED');
shm_detach(self::$shmId);
shm_remove(self::$shmId);
posix_kill($pid, SIGKILL);
return true;
} else {
return false;
}
}
/**
* Returns the current status of this task
* @return string
*/
final public function getStatus() {
return @shm_has_var(self::$shmId, $this->var_key_status) ? shm_get_var(self::$shmId, $this->var_key_status) : 'PENDING';
}
/**
* Returns true if this task was cancelled before it completed normally
* @return boolean
*/
final public function isCancelled() {
return @shm_has_var(self::$shmId, $this->var_key_status) == 'CANCELED' ? true : false;
}
/**
* Runs on the thread before doInBackground($parameters)
* @return void
*/
protected function onPreExecute($parameters) {
}
/**
* Override this method to perform a computation on a background thread
* @param mixed $parameters
* @return mixed
*/
abstract protected function doInBackground($parameters);
/**
* Runs on the thread after doInBackground($parameters)
* @param mixed $result
*/
protected function onPostExecute($result) {
}
/**
* Runs on the thread after cancel()
* @return void
*/
protected function onCancelled() {
}
/**
* Runs on the thread before destroy class instance
* @param bool $isfork Defin if its inside thread fork or not.
*/
protected function onDestroy($isfork) {
}
/**
* Runs on the thread error
* @param string $msg
* @param \Exception $exception
*/
protected function onError($msg, $exception) {
}
final public function getFullLog(){
return $this->_log;
}
final public function log($msg) {
$name = str_replace(['Application\Tasks\Async\\'], [''], self::$name);
$filename = self::$_logdir . $name . ".log";
$date = date("Y-m-d H:i:s");
$pid = getmypid();
$msg = "[{$date}(Task {$name}:{$pid})]{$msg}" . PHP_EOL;
$this->_log .= $msg;
error_log($msg, 3, $filename);
}
final public static function FatalErrorCatch() {
try {
$error = error_get_last();
$byPass = array(E_RECOVERABLE_ERROR,
E_WARNING,
E_PARSE,
E_NOTICE,
E_STRICT,
E_DEPRECATED);
if ($error !== NULL && is_array($error) && array_key_exists('type', $error) && !in_array($error['type'], $byPass)) {
$pid = getmypid();
$date = date(DATE_ATOM);
$msg = "[$date][Thread Error]: " . $error["message"] . " in " . $error["file"] . "(" . $error["line"] . ")" . PHP_EOL;
$trace = array_reverse(debug_backtrace());
array_pop($trace);
foreach ($trace as $item) {
$msg .= (isset($item['file']) ? $item['file'] : '<unknown file>') . ' ' . (isset($item['line']) ? $item['line'] : '<unknown line>') . ' calling ' . $item['function'] . '()' . PHP_EOL;
}
error_log($msg, 3, self::$_logdir . "{$pid}_error.log");
$sys = new \Exception($msg, 500, null);
self::onError($msg, $sys);
}
} catch (\Exception $e) {
$date = date(DATE_ATOM);
error_log("[$date][Thread Fatal Error]" . $e->getMessage() . " in " . $e->getFile() . "[" . $e->getLine() . "]" . PHP_EOL . $e->getTraceAsString(), 3, self::$_logdir . "error.log");
}
}
}
<?php
require 'AsyncTaskAbstract.php';
require 'DelayedUnlink.php';
//Essas configurações são apenas para vc pode mostrar algo antes do script continuar.
ini_set('output_buffering', 'off');
ini_set("zlib.output_compression", false);
set_time_limit(0);
ini_set('auto_detect_line_endings', 1);
while (@ob_end_flush());
ini_set('implicit_flush', true);
ob_implicit_flush(true);
gc_enable();
try {
echo "<pre>";
//Cria o arquivo a ser deletado
$file = tempnam("/tmp/", "teste_");
echo $file,file_exists($file),PHP_EOL;
flush();
sleep(1); //Simula alguma execução anterior
//Inicializa o objeto
$task = new Application\Tasks\Async\DelayedUnlink();
$task->setDelay(10); //Seta as configurações, nesse caso deletará o arquivo após 10 segundos.
echo "[",date(DateTime::ATOM),"] Start",PHP_EOL;
echo "[",date(DateTime::ATOM),"] Status: ",$task->getStatus(),PHP_EOL;
$task->run($file); //Inicializa a Thread
//Fica continuamente verificando o status da thread.
while (true) {
if (connection_status() != CONNECTION_NORMAL || connection_aborted()) {
break;
}
echo $file,file_exists($file),PHP_EOL;
$status = $task->getStatus();
echo $task->getStatus(),PHP_EOL;
if("FINISHED" == $status){
break;
}
sleep(1);
}
} catch (\Exception $e) {
var_dump($e);
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment