Skip to content

Instantly share code, notes, and snippets.

@avtobys
Last active October 5, 2023 12:13
Show Gist options
  • Save avtobys/63ec970d7b6a3952486b88bcffa39596 to your computer and use it in GitHub Desktop.
Save avtobys/63ec970d7b6a3952486b88bcffa39596 to your computer and use it in GitHub Desktop.
Demon PHP
<?php
/************************ настройки ************************/
const MIN_PROCESSES = 30; // минимальное количество одновременно исполняемых процессов
const MAX_PROCESSES = 30; // максимальное количество одновременно исполняемых процессов
const MAX_ITERATIONS = 1000; // максимально возможное количество итераций(выполнений), 0 = неограниченно
const MAX_SYSLOAD_PERCENT = 90; // максимальный процент нагрузки системы для запуска скрипта
$LOG_FILE = __DIR__ . '/' . basename(__FILE__, '.php') . '.log'; // для отключения логгирования просто замените на /dev/null
$LOCK_FILENAME = '/tmp/' . md5(__FILE__) . '.lock';
const DB_NAME = 'my_db';
const DB_HOST = 'localhost';
const DB_PORT = 3306;
const DB_USER = 'XXXX';
const DB_PASSWORD = 'XXXX';
const DB_CHARSET = 'utf8mb4';
if (!in_array(PHP_SAPI, array('cli', 'cli-server', 'phpdbg'))) { // только для CLI использования
echo "Only CLI mode is supported\n";
exit(1);
}
/******* управление - старт / стоп / рестарт демона ********/
if (isset($argv[1]) && $argv[1] == 'stop') {
$attempts = 0;
$killed = 0;
do {
exec('pgrep -a php$ | grep -F "' . __FILE__ . '"', $output, $result_code);
$pids = [];
if ($output && ($pids = array_map(function ($item) {
return explode(' ', $item)[0];
}, array_filter($output, function ($item) {
return !preg_match('/^[\d]+.+(stop|restart)$/', $item);
})))) {
$killed += count($pids);
exec('kill ' . implode(' ', $pids) . ' > /dev/null 2>&1');
}
unset($output, $result_code);
if ($attempts >= 10 && empty($pids) && $killed) {
@unlink($LOCK_FILENAME);
echo "Success: all demon processes($killed) killed\n";
exit(0);
}
usleep(300000);
} while ($attempts++ < 100);
if (!$killed) {
echo "Error: demon not running. Processes not found...\n";
exit(1);
}
echo "Error: can't kill all demon processes\n";
exit(1);
}
if ((isset($argv[1]) && $argv[1] == 'start')) {
usleep(500000);
if (intval(shell_exec('pgrep -a php$ | grep -F "' . __FILE__ . '" | wc -l 2>/dev/null')) > 1) {
echo "Error: demon already running\n";
exit(1);
}
if (file_exists($LOCK_FILENAME)) {
$attempts = 0;
do {
usleep(200000);
exec('pgrep -a php$ | grep -F "' . __FILE__ . '"', $output, $result_code);
if ($output && ($pids = array_map(function ($item) {
return explode(' ', $item)[0];
}, array_filter($output, function ($item) {
return !preg_match('/^[\d]+.+(stop|restart|start|list)$/', $item);
})))) {
echo "Error: demon already running\n";
exit(1);
}
unset($output, $result_code);
} while ($attempts++ < 100);
@unlink($LOCK_FILENAME);
}
exec("export EXEC_CALL=1 && php -f " . __FILE__ . " >> $LOG_FILE 2>&1 &");
echo "Success: demon started\n";
exit(0);
}
if (isset($argv[1]) && $argv[1] == 'restart') {
exec("php -f " . __FILE__ . " stop", $output);
echo implode("\n", $output) . "\n";
unset($output);
exec("php -f " . __FILE__ . " start", $output);
echo implode("\n", $output) . "\n";
exit(0);
}
if (isset($argv[1]) && $argv[1] == 'list') {
exec('pgrep -a php$ | grep -F "' . __FILE__ . '"', $output, $result_code);
$output = array_filter($output, function ($item) {
return !preg_match('/^[\d]+.+(stop|start|restart|list)$/', $item);
});
array_unshift($output, "Current demon processes: " . count($output) . "\n");
echo implode("\n", $output) . "\n";
exit(0);
}
if ($argv[0] != __FILE__ || (isset($argv[1]) && $argv[1] == 'help') || !getenv('EXEC_CALL')) {
echo "\nUsage: php -f " . (getcwd() == __DIR__ ? '' : __DIR__ . '/') . basename(__FILE__) . " [start|stop|restart|help]\n\n";
echo "Example: php -f " . (getcwd() == __DIR__ ? '' : __DIR__ . '/') . basename(__FILE__) . " start\n";
echo "Example: php -f " . (getcwd() == __DIR__ ? '' : __DIR__ . '/') . basename(__FILE__) . " stop\n";
echo "Example: php -f " . (getcwd() == __DIR__ ? '' : __DIR__ . '/') . basename(__FILE__) . " restart\n";
echo "Example: php -f " . (getcwd() == __DIR__ ? '' : __DIR__ . '/') . basename(__FILE__) . " list\n";
echo "Example: php -f " . (getcwd() == __DIR__ ? '' : __DIR__ . '/') . basename(__FILE__) . " help\n\n";
exit(1);
}
/*********** контроль нагрузки на всякий пожарный **********/
if (shell_exec("echo $(nproc) $(cat /proc/loadavg | awk '{print $1}') | awk '{print $2/$1*100}'") > MAX_SYSLOAD_PERCENT) {
echo "Error: system load too high\n";
exit(1);
}
/*********** автозапуск дополнительных потоков *************/
$attempts = 0;
do {
usleep(200000);
} while ($attempts++ < 100 && !($proc = intval(shell_exec('pgrep -a php$ | grep -F "' . __FILE__ . '" | wc -l 2>/dev/null'))));
if ($proc == 0) exit(1);
if ($proc > MAX_PROCESSES) exit(1);
$file = new SplFileObject($LOCK_FILENAME, 'a+b');
$file->flock(LOCK_EX);
$file->rewind();
$iteration = (int)$file->fgets();
$iteration++;
if (MAX_ITERATIONS && $iteration > MAX_ITERATIONS) {
$file->ftruncate(0);
$file->fwrite($iteration);
$file->flock(LOCK_UN);
exit(0);
}
/************************************************************/
// тело скрипта в момент блокировки потока - код гарантированно всегда выполняемый в одном экземляре
// здесь могут быть запросы к бд, получение уникальных id для каждого потока, с обновлением статуса для этих id (в обработке и т.п.)
// try {
// $dbh = new PDO("mysql:dbname=" . DB_NAME . ";host=" . DB_HOST . ";port=" .DB_PORT . ";charset=" . DB_CHARSET, DB_USER, DB_PASSWORD, [PDO::ATTR_DEFAULT_FETCH_MODE => PDO::FETCH_OBJ]);
// } catch (PDOException $e) {
// echo "Error: " . $e->getMessage();
// exit(1);
// }
// $id = $dbh->query("SELECT id FROM my_table WHERE status = 'new' LIMIT 1")->fetchColumn();
// $dbh->exec("UPDATE my_table SET status = 'processing' WHERE id = $id");
// $dbh = null;
/************************************************************/
if ($proc < MIN_PROCESSES) {
exec("export EXEC_CALL=1 && php -f " . __FILE__ . " >> $LOG_FILE 2>&1 &");
}
$file->ftruncate(0);
$file->fwrite($iteration);
$file->flock(LOCK_UN);
/************************************************************/
/** тело скрипта, которое необходимо выполнять в многопотоке */
usleep(rand(10000, 1000000));
echo date('M d H:i:s') . " " . getenv('USER') . " Iteration: $iteration; Current processes: $proc; Time: " . (microtime(true) - $_SERVER['REQUEST_TIME_FLOAT']) . "\n";
// обновляем статусы в бд
// try {
// $dbh = new PDO("mysql:dbname=" . DB_NAME . ";host=" . DB_HOST . ";port=" .DB_PORT . ";charset=" . DB_CHARSET, DB_USER, DB_PASSWORD, [PDO::ATTR_DEFAULT_FETCH_MODE => PDO::FETCH_OBJ]);
// } catch (PDOException $e) {
// echo "Error: " . $e->getMessage();
// exit(1);
// }
// $dbh->exec("UPDATE my_table SET status = 'processed' WHERE id = $id");
// $dbh = null;
/*************************************************************/
/********* автозапуск следующих потоков **********************/
next_iteration:
$proc = intval(shell_exec('pgrep -a php$ | grep -F "' . __FILE__ . '" | wc -l 2>/dev/null'));
if ($proc > MAX_PROCESSES) {
exit(0);
}
$file = new SplFileObject($LOCK_FILENAME, 'a+b');
$file->flock(LOCK_EX);
$file->rewind();
$iteration = (int)$file->fgets();
if (MAX_ITERATIONS && $iteration > MAX_ITERATIONS) {
exit(0);
}
if ($proc <= MIN_PROCESSES) {
exec("export EXEC_CALL=1 && php -f " . __FILE__ . " >> $LOG_FILE 2>&1 &");
}
$file->flock(LOCK_UN);
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment