工做须要,了解学习swoole相关,这是一个swoole process的实例。封装了一个简单的类库,实现了一个能够动态扩容的进程池,该进程池默认会建立min_worker_num个进程来处理任务,当发现进程不够用的时候,会自动建立子进程执行任务。任务结束后,子进程均可被回收。php
<?php /** * process. * User: xiongzai * Date: 2016/6/27 * Time: 14:00 */ class Process { private $process_list = []; // 进程池对象数组 private $process_use = []; // 进程占用标记数组 private $min_worker_num = 0; //进程池初始化进程数 private $max_worker_num = 10; // 进程池进程数最大值 private $current_num; // 当前进程数 public $redis; //测试 public无妨 public function __construct() { $this->initRedis(); //实例化redis // 初始化进程池 for($i = 0 ; $i < $this->min_worker_num ; $i++){ $this->createProcess(); } //注册信号 $this->signal(); //测试跑的太快,10秒后关闭进程而已。这里仅测试用来关闭进程。 swoole_timer_tick(10000, function($timer_id) { foreach ($this->process_list as $process) { $process->write("exit"); } swoole_timer_clear($timer_id); }); } //这里须要redis扩展 private function initRedis(){ $redis = new Redis(); $redis->connect('127.0.0.1', 6379 ); $redis->auth(666666); $redis->select(0); $this->redis = $redis; } /** * 建立进程并返回PID * @return mixed */ private function createProcess(){ $process = new swoole_process(array($this, 'task_run') , false , 2); $pid = $process->start(); $this->process_list[$pid] = $process; $this->process_use[$pid] = 0; //绑定子进程管道的读事件,接收子进程任务结束的通知 swoole_event_add($process->pipe, function ($pipe) use($process) { $pid = $process->read(); echo "PID:".$pid." end".PHP_EOL; //重置为等待任务状态。那么这个进程能够本身去接收新任务处理了,具体本身实现。 $this->process_use[$pid] = 0; }); $this->current_num += 1; return $pid; } /** * 添加任务并分配空闲进程执行 * @param string $params * @return bool 返回是否有进程执行任务 */ public function task($params){ $result = false; foreach ($this->process_use as $pid => $used) { // 找到了闲置的进程 if($used == 0) { $result = true; $this->process_use[$pid] = 1; // 派发任务 $this->process_list[$pid]->write($params); } } //没有可用进程执行任务 新建立进程执行 if($result == false && $this->current_num < $this->max_worker_num){ $pid = $this->createProcess(); $this->process_use[$pid] = 1; $this->process_list[$pid]->write($params); $result = true; } return $result; } public function task_run($worker) { // 注册监听管道的事件,接收任务 swoole_event_add($worker->pipe, function ($pipe) use ($worker){ $data = $worker->read(); if($data == 'exit'){ $worker->exit(); // 收到退出指令,关闭子进程 exit; } $this->task_do($data); // 执行完成,通知父进程 $worker->write($worker->pid); }); } private function task_do($data){ echo 'i:'.$data.PHP_EOL; } //注册各类信号 private function signal(){ // 注册信号,回收退出的子进程 swoole_process::signal(SIGCHLD, function($sig) { while($ret = swoole_process::wait(false)) { echo "PID={$ret['pid']} out\n"; } }); } } $process = new Process(); while (true){ if($process->redis->exists('tastList')){ for ($i = 1 ; $i <= 100 ; $i ++){ $task_id = $process->redis->lPop('tastList'); if(!$process->task($task_id)){ $process->redis->lPush('tastList',$task_id); } } sleep(1); //每秒钟100个 感受差很少了 } }
代码如上,运行效果图以下:(注意:tastList里面只有1到10)redis
运行效果看起来还能够的样子。可是若是把$min_worker_num初始化线程池的数量改成5,再看以下效果图:数组
为何出现了那么多task_id:1呢?这个问题得好好想一想,本文最后解答。swoole
以上只是简单的例子,实际运用中还会有各类状况。进程池的功能可能远比此复杂,这里只是抛砖引玉。 若是不须要动态扩容,能够建立足够多的子进程,开启消息队列模式,设置抢占,空闲进程自动处理任务。用定时任务派发任务,也能够经过定时器来主动退出一段时间内没有处理任务的子进程等等。本身撸代码折腾去。学习
如今回答为何那么多task_id:1的问题:测试
以上已经贴了代码以及运行效果图,甚至还写了那么多文字,其实那些都是浮云。其实我也只是想来问问为何出现那么多的task_id:1,跪求大神帮忙看下而已、而已、而已……………………,答案:我不知道啊。this