swoole_process 主要是用来代替 PHP 的 pcntl 扩展。咱们知道 pcntl 是用来进行多进程编程的,而 pcntl 只提供了 fork 这样原始的接口,容易使用错误,而且没有提供进程间通讯以及重定向标准输入输出的功能。php
而 swoole_process 则提供了比 pcntl 更强大的功能,更易用的API,使PHP在多进程编程方面更加轻松。html
swoole 有一个 Reactor 线程,这个线程能够说是对 epoll 模型的封装,能够设置 read 事件和 write 事件的监听回调函数。编程
下面会用到一个函数:数组
bool swoole_event_add(mixed $sock, mixed $read_callback, mixed $write_callback = null, int $flags = null);
swoole_client->$sock
、swoole_process->$pipe
或者其余 fd(socket_create
建立的资源 , stream_socket_client/fsockopen
建立的资源)多进程编程少不了进程之间的通信,swoole 的进程之间有两种通讯方式,一种是消息队列(queue),另外一种是管道(pipe)。那么本文使用的是 pipe 的方式。swoole
代码:socket
<?php class ProcessPool{ private $process; /** * Worker 进程数组 * @var array */ private $process_list = []; /** * 正在被使用的进程 * @var array */ private $process_use = []; /** * 最少进程数量 * @var int */ private $min_worker_num = 3; /** * 最多进程数量 * @var int */ private $max_worker_num = 6; /** * 当前进程数量 * @var int */ private $current_num; public function __construct() { $this->process = new swoole_process(array($this, 'run'), false, 2); $this->process->start(); swoole_process::wait(); } public function run() { $this->current_num = $this->min_worker_num; //建立全部的worker进程 for($i = 0; $i < $this->current_num; $i++){ $process = new swoole_process(array($this, 'task_run'), false, 2); $pid = $process->start(); $this->process_list[$pid] = $process; $this->process_use[$pid] = 0; } foreach($this->process_list as $process){ swoole_event_add($process->pipe, function ($pipe) use ($process){ $data = $process->read(); var_dump($data . '空闲'); //接收子进程处理完成的信息,而且重置为空闲 $this->process_use[$data] = 0; }); } //每秒定时向worker管道投递任务 swoole_timer_tick(1000 ,function ($timer_id){ static $index = 0; $index = $index + 1; $flag = true; //是否新建worker foreach ($this->process_use as $pid => $used){ if($used == 0){ $flag = false; //标记为正在使用 $this->process_use[$pid] = 1; // 在父进程内调用write,子进程能够调用read接收此数据 $this->process_list[$pid]->write($index. "hello"); break; } } if($flag && $this->current_num < $this->max_worker_num){ //没有闲置worker,新建worker来处理 $process = new swoole_process(array($this, 'task_run'), false, 2); $pid = $process->start(); $this->process_list[$pid] = $process; $this->process_use[$pid] = 1; $this->process_list[$pid]->write($index. "hello"); $this->current_num++; } var_dump('第' .$index. '个任务'); if($index == 10){ foreach($this->process_list as $process){ $process->write("exit"); } swoole_timer_clear($timer_id); $this->process->exit(); } }); } /** * 子进程处理 * @param $worker */ public function task_run($worker) { swoole_event_add($worker->pipe, function($pipe)use($worker){ $data = $worker->read(); var_dump($worker->pid . ':' . $data); if($data == 'exit'){ $worker->exit(); exit; } //模拟耗时任务 sleep(5); //告诉主进程处理完成 //在子进程内调用write,父进程能够调用read接收此数据 $worker->write($worker->pid); }); } } new ProcessPool();
首先定义几个重要的属性:函数
在实例化的时候建立主进程,而且运行 run 方法,在 run 方法里面先建立全部的 worker 进程,而且设置为空闲状态。oop
接着遍历全部的 worker 进程,而且加入 EventLoop 中,设置可读事件,用于接收子进程的空闲信号。this
最后每隔一秒向 worker 进程投递任务。动态扩充进程池则在这里实现,若是没有闲置的进程,而此时又有新的任务,则须要动态建立一个新的进程而且置为繁忙状态。因为只模拟了十次任务,则第十个任务完成以后在父进程中发送 exit 使全部子进程退出。spa
https://wiki.swoole.com/wiki/...
https://opso.coding.me/2018/0...