背景
公司准备引入swoole和rabbitmq来处理公司业务。因此,我引入hyperf框架,想用swoole的多进程来实现。
自定义启动服务封装
<?php
/**
* 进程启动服务【manager】
*/
declare(strict_types=1);
namespace App\Command;
use Swoole;
use Swoole\Process;
use Swoole\Process\Pool;
use App\Process\BaseProcess;
use Hyperf\Command\Command as HyperfCommand;
use Psr\Container\ContainerInterface;
use Symfony\Component\Console\Input\InputArgument;
use Symfony\Component\Console\Input\InputOption;
/**
* @Command
*/
#[Command]
class TaskProcessCommand extends HyperfCommand
{
const MANAGER_PROCESS_PID_PATH = BASE_PATH . '/runtime/taskProcess.pid';
/**
* @var ContainerInterface
*/
protected $container;
protected $coroutine = false;
public function __construct(ContainerInterface $container)
{
$this->container = $container;
parent::__construct('task');
}
public function configure()
{
parent::configure();
$this->setDescription('自定义进程任务');
$this->addOption('daemonize', 'd', InputOption::VALUE_NONE, '守护进程化');
$this->addArgument('action', InputArgument::REQUIRED, 'start/stop/restart 启动/关闭/重启');
}
public function handle()
{
$action = $this->input->getArgument('action');
if ($action === 'start') {
$this->start();
} elseif ($action === 'stop') {
$this->stop();
} elseif ($action === 'restart') {
$this->restart();
} else {
echo "不支持的action, 请输入 -h 参数查看" . PHP_EOL;
}
}
/**
* 重启:php bin/hyperf.php task restart
*/
protected function restart()
{
$this->stop();
$this->start();
}
/**
* 停止:php bin/hyperf.php task stop
*/
protected function stop()
{
if (file_exists(self::MANAGER_PROCESS_PID_PATH)) {
//后期可以写入数据表,根据状态进行重启
$managerPid = file_get_contents(self::MANAGER_PROCESS_PID_PATH);
echo "stopping...\n";
echo "kill pid $managerPid \n";
$managerPid = intval($managerPid);
$startTime = time();
$timeout = config('server.settings.max_wait_time', 10);
@Process::kill($managerPid);
//等待主进程结束
while (@Process::kill($managerPid, 0)) {
//waiting process stop
echo "waiting...\r";
usleep(100000);
echo " \r";
echo "waiting.\r";
usleep(100000);
echo " \r";
//超时 强杀所有子进程
if ($managerPid > 0 && time() - $startTime >= $timeout) {
echo "wait timeout, kill -9 child process, pid: $managerPid \n";
echo shell_exec("ps -ef|awk '$3~/^{$managerPid}$/'") . PHP_EOL;
echo shell_exec("ps -ef|awk '$3~/^{$managerPid}$/ {print $2}'|xargs kill -9") . PHP_EOL;
}
}
unlink(self::MANAGER_PROCESS_PID_PATH);
echo "stopped. \n";
} else {
echo "找不到manager pid, path: " . self::MANAGER_PROCESS_PID_PATH;
}
}
/**
* 启动:php bin/hyperf.php task start
* 守护进程启动:php bin/hyperf.php task start -d
*/
protected function start()
{
$processConfig = config('processes');
if ($processConfig) {
echo "start now.\n";
$daemonize = $this->input->getOption('daemonize');
if ($daemonize) {
//重定向标准输出到指定日志文件
fclose(STDOUT);
fclose(STDERR);
$STDOUT = fopen(BASE_PATH . '/runtime/logs/taskProcess_output.log', 'ab');
$STDERR = fopen(BASE_PATH . '/runtime/logs/taskProcess_error.log', 'ab');
Process::daemon(true, true);
}
//save pid
file_put_contents(self::MANAGER_PROCESS_PID_PATH, getmypid());
//TODO 后期可以根据需要写入配置或者数据表,开启多个主进程、挂载多个子进程
BaseProcess::setProcessName('manager');//主进程
$startFuncMap = [];
foreach ($processConfig as $processClass) {
$processObj = new $processClass;
if ($processObj->isEnable && ($processObj instanceof BaseProcess) && isset($processObj->nums) && $processObj->nums > 0) {
for ($i = 0; $i < $processObj->nums; $i++) {
$startFuncMap[] = [
[$processObj, 'handle'],
$processObj->enableCoroutine ?? false,
$i,
];
}
}
}
$pool = new Pool(count($startFuncMap), SWOOLE_IPC_UNIXSOCK, 0, false);
$pool->on('workerStart', function (Pool $pool, int $workerId) use ($startFuncMap) {
[$func, $enableCoroutine, $idx] = $startFuncMap[$workerId];
if ($enableCoroutine) {
run(function () use ($func, $pool, $workerId, $idx) {
$pm = $func[0];//process下的类
$idx += 1;
BaseProcess::setProcessName($pm->name . "[{$idx}/{$pm->nums}]");//多个子进程
call_user_func($func, $pool, $workerId);
});
} else {
$func($pool, $workerId);//baseProcess下的handle
}
});
$pool->on('Message', function (Swoole\Process\Pool $pool, string $data) {
echo 'process Message,data=' .json_encode($data). PHP_EOL;
});
//进程关闭
$pool->on("WorkerStop", function (Swoole\Process\Pool $pool, int $workerId) {
echo "process WorkerId={$workerId} is stopped". PHP_EOL;
});
$pool->start();
} else {
printf("没有可启动的自定义进程, 请在配置task_process中声明,且继承%s\n", BaseProcess::class);
}
}
/**
* 查看运行状态:php bin/hyperf.php task status
*/
protected function status(){
//TODO 查看任务执行状态
}
public function getProcess($pid = -1)
{
if ($pid === -1) {
$pid = getmypid();
}
return static::$process[$pid] ?? null;
}
public function getAllProcess()
{
return static::$process;
}
}
基础process封装
此处可以用hyperf框架自带的,也可以自己封装
<?php
declare (strict_types = 1);
namespace App\Process;
use Swoole;
use Swoole\Process\Pool;
abstract class BaseProcess {
/**
* 进程数
* @var integer
*/
public $nums = 0;
/**
* 进程名称
* @var string
*/
public $name = '';
/**
* 是否启用协程
* @var bool
*/
public $enableCoroutine = true;
/**
* 是否随进程启动服务
* @var bool
*/
public $isEnable = true;
protected $isRunning = true;
protected $process;
static $signal = 0;
function __construct() {
//进程自动命名
if (empty($this->name)) {
$this->name = trim(str_replace('\\', '.', str_replace(__NAMESPACE__, '', get_called_class())), '.');
}
}
final public function handle(Pool $pool, int $workerId): void {
try {
$this->processInit($pool->getProcess());
$this->beforeRun();
while (true) {
//进程结束信号
if (BaseProcess::$signal === SIGTERM) {
$this->onProcessExit();
break;
}
$this->run();
}
} catch (\Throwable $e) {
throw $e;
}
}
protected function onProcessExit() {
$this->isRunning = false;
}
protected function processInit($process) {
$this->process = $process;
echo "process {$this->name} start, pid: " . getmypid().PHP_EOL;
//注册信号处理器,实现优雅重启(等待任务执行完后或者等待超时)
pcntl_signal(SIGTERM, function () {
BaseProcess::$signal = SIGTERM;
$maxWaitTime = config('server.settings.max_wait_time', 5);
$sTime = time();
//检查进程任务状态
Swoole\Timer::tick(500, function () use ($sTime, $maxWaitTime) {
$coStat = \Swoole\Coroutine::stats();
//如果主循环结束,且其它协程任务执行完,清理定时器以退出进程
if (!$this->isRunning && $coStat['coroutine_num'] <= 1) {
Swoole\Timer::clearAll();
$this->process->exit();
}
//等待超时,强制结束进程
elseif (time() - $sTime >= $maxWaitTime) {
Swoole\Timer::clearAll();
if ($this->isRunning) {
$this->onProcessExit();
}
$this->process->exit();
}
});
});
}
public static function setProcessName(string $name) {
swoole_set_process_name(env('APP_NAME', 'app') . '.taskProcess.' . $name);
}
/**
* 事件循环前调用
* @return [type] [description]
*/
abstract function beforeRun();
/**
* 事件循环,注意这里不能使用死循环
* @return [type] [description]
*/
abstract function run();
}
使用demo
demo1
<?php
declare (strict_types = 1);
namespace App\Process;
/**
* test
*/
class TestProcess extends BaseProcess {
/**
* 进程数
* @var integer
*/
public $nums = 5;
public $enableCoroutine = true;
/**
* 不随服务启动进程
* @var bool
*/
public $isEnable = false;
public function beforeRun() {
//事件循环前执行,比如一些初始化工作
}
public function run() {
//事件循环主体
echo date('Y-m-d H:i:s').PHP_EOL;
usleep(1000);
}
}
demo2
<?php
namespace App\Process;
use App\Amqp\Producer\JbtyProducer;
use App\Amqp\Producer\UpdateZeroStockProducer;
use App\Library\Jbchip\JbchipRequest;
use App\Model\HqchipGoodsModel;
use App\Model\IcbaseGoodsModel;
use App\Model\JbtyGoodsModel;
use App\Model\LcscGoodsModel;
use App\Model\OneyacGoodsModel;
use Hyperf\Amqp\Producer;
use Hyperf\Redis\Redis;
use Hyperf\Utils\ApplicationContext;
class UpdateZeroStock extends BaseProcess
{
const ZERO_STOCK_KEY = 'platform_zero_stock_cache_key';
/**
* 进程数
* @var integer
*/
public $nums = 1;
public $enableCoroutine = true;
/**
* 随服务启动进程
* @var bool
*/
public $isEnable=true;
public function beforeRun() {
//事件循环前执行,比如一些初始化工作
}
public function run() {
//事件循环主体
$this->updateZeroStock();
echo date('Y-m-d H:i:s').PHP_EOL;
sleep(300);
}
public function updateZeroStock()
{
// 1.全量更新
$list_hq = HqchipGoodsModel::select(['id','spu','stock','manufacturer'])->where('excute_time','<',8)->limit(1000)->get();
$container = ApplicationContext::getContainer();
$redis = $container->get(Redis::class);
$producer = ApplicationContext::getContainer()->get(Producer::class);
$today = date('Y-m-d');
if($list_hq){
foreach ($list_hq as $item){
$spu = trim($item['spu']);
$zeroStockKey = $this->getZeroStockKey($today,'hqchip',$item['manufacturer']);
if($redis->exists($zeroStockKey) && !$redis->hGet($zeroStockKey,$spu)){
$sendData = $item;
$sendData['appKey'] = $this->appSecretKey();
$sendData['platform'] = 'hqchip';
$message = new UpdateZeroStockProducer($sendData);
$res = $producer->produce($message);
echo date('Y-m-d H:i:s') . 'rabbitmq hqchip sendMq: ' .$res . PHP_EOL;
}
}
}
}
/**
* 零库存缓存KEY
* @param $brand
* @param $sku
* @return string
*/
private function getZeroStockKey($day,$platfrom,$brand)
{
return self::ZERO_STOCK_KEY .":". $platfrom .":" . $day . ":" . $brand;
}
/**
* 密钥生产
* @return string
*/
private function appSecretKey()
{
$a = 'chipmall-spider&V2&' . date('Y-m-d');
$appKey = base64_encode(md5($a) .'||'. base64_encode(time() . '|' . $a));
return $appKey;
}
}
在配置中进程需要执行的服务
以守护进程方式启动服务
php bin/hyperf.php task start -d
查看进程命令
ps -ef|grep taskProcess
疑惑
这次封装还存在两个点需要完善!!!
1.重复执行:
php bin/hyperf.php task start -d
会启动多个manager进程文章来源:https://uudwc.com/A/Y6d4J
2、没有封装查看进程状态的status方法文章来源地址https://uudwc.com/A/Y6d4J