Skip to content

Instantly share code, notes, and snippets.

@forthxu
Created April 20, 2023 07:58
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save forthxu/446260e3a4ae5fd446b59c847ab27b7a to your computer and use it in GitHub Desktop.
Save forthxu/446260e3a4ae5fd446b59c847ab27b7a to your computer and use it in GitHub Desktop.
swoole-1.8.5 多进程任务处理
<?php
/**
* 任务处理服务
$taskServer = new \TaskServer(
100,//工作进程数量
function($taskServer){//onMaster: 主进程任务
//测试发任务到工作进程
swoole_timer_tick(100, function () use ($taskServer) {
$taskServer->task("hello");
});
},
function($data){//onTask: 任务进程对数据做处理并返回
return $data.' world';
},
function($data, $pid){//onFinish: 获取任务进程处理完的数据
$this->log($pid.': '.$data);
}
);
$taskServer->start();
*/
class TaskServer extends \Swoole\SwooleAbstract {
protected $_workerNum;
protected $_onMaster;
protected $_onTask;
protected $_onFinish;
protected $_pid;
protected $_timerIds;
protected $_workers;
public function __construct($workNum=3, $onMaster=null, $onTask=null, $onFinish=null){
$this->_workerNum = $workNum;
$this->_onMaster = $onMaster;
$this->_onTask = $onTask;
$this->_onFinish = $onFinish;
}
public function start() {
try {
//守护进程
//swoole_process::daemon(false, true);
//主进程
$pid = getmypid();
$this->_pid = $pid;
$this->log('master pid:' . $pid . ' start.');
swoole_set_process_name("taskServer: master {$pid}");
//创建子进程
for ($i = 0; $i < $this->_workerNum; $i++) {
$this->createProcess();
}
//定时
$this->_timerIds[] = swoole_timer_tick(1000, function () {
//检查工作进程数量
if (count($this->_workers) < $this->_workerNum) {
$this->createProcess();
}
//回收工作进程
$ret = swoole_process::wait(false);
if ($ret) {
$pid = $ret['pid'];
$this->destryProcess($pid);
$this->log("worker down wait worker process exist:{$ret['signal']} pid:" . $pid);
}
});
//主进程关闭
swoole_process::signal(SIGTERM, function ($signo) {
$pid = getmypid();
$this->log("master process exist rev:{$signo} pid:" . $pid);
//清理所有定时任务
foreach ($this->_timerIds as $timerId) {
swoole_timer_clear($timerId);
}
//清理工作进程
foreach ($this->_workers as $pid => $process) {
$process->kill($pid, SIGTERM);
}
//等待工作进程退出
while (true) {
//等待工作进程结束
$count = count($this->_workers);
if ($count > 0) {
$this->log("wait worker num: " . $count);
$ret = swoole_process::wait(false);
if ($ret) {
$pid = $ret['pid'];
$this->destryProcess($pid);
$this->log("master down wait worker process exist:{$ret['signal']} pid:" . $pid);
}
} else {
break;
}
}
exit();
});
//主进程任务
if(is_callable($this->_onMaster)){
$fun = $this->_onMaster;
$fun($this);
}
} catch (\Exception $e) {
die('server error: ' . $e->getMessage());
}
}
public function task($data) {
$processKey = array_rand($this->_workers);
$process = $this->_workers[$processKey];
//$result = $process->write($data . " {$process->pid}");
$result = $process->write($data);
if (!$result) {
$this->log("write fail pid: " . $process->pid);
}
}
protected function destryProcess($pid) {
$process = $this->_workers[$pid];
swoole_event_del($process->pipe); //清理监听
$process->close();
unset($this->_workers[$pid]); //移除进程管理
}
protected function createProcess() {
$process = new swoole_process(function (swoole_process $process) {
$this->log('worker pid:' . $process->pid . ' start.');
$process->name("taskServer: worker {$process->pid}");
//子进程内收到主进程消息
swoole_event_add($process->pipe, function ($pipe) use ($process) {
$this->onTask($pipe, $process);
});
//主进程退出,工作进程跟着退出
swoole_timer_tick(1000, function () {
if (!swoole_process::kill($this->_pid, 0)) {
// 主进程已退出
$pid = getmypid();
$this->log("master process exist " . $this->_pid . " worker process exist " . $pid);
exit();
}
});
swoole_process::signal(SIGSEGV, function ($signo) use ($process) {
//子进程
$pid = getmypid();
//子进程退出
$this->log("worker process exist rev:{$signo} pid:" . $pid);
});
//子进程收到信号
swoole_process::signal(SIGTERM, function ($signo) use ($process) {
//子进程
$pid = getmypid();
//移除监听
swoole_event_del($process->pipe);
$process->close();
//子进程退出
$this->log("worker process exist rev:{$signo} pid:" . $pid);
$process->exit();
exit();
});
});
$pid = $process->start();
if ($pid < 1) {
$this->log("worker process create fail");
return 0;
}
//收录子进程
$this->_workers[$pid] = $process;
//主进程收到子进程消息
swoole_event_add($process->pipe, function ($pipe) use ($process) {
$this->onFinish($pipe, $process);
});
return $pid;
}
//工作进程收到数据
protected function onTask($pipe, $process) {
if (!$process instanceof \swoole_process) {
$this->log("fail 2");
}
$data = $process->read();
$result='';
if(is_callable($this->_onTask)){
$fun = $this->_onTask;
$result = $fun($data);
}else{
//$this->log("RECV: " . $data);
//$result = "hello master {$process->pid}";
}
if(strlen($result)>0){
$process->write($result);
}
}
//工作进程处理完业务后返回数据,函数根据pipe确定具体工作进程后读取数据
protected function onFinish($pipe, $process) {
if (!$process instanceof \swoole_process) {
$this->log("fail 1");
}
$data = $process->read();
if(is_callable($this->_onFinish)){
$fun = $this->_onFinish;
$fun($data, $process->pid);
}else{
//$this->log("RECV: " . $data);
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment