提早打个预防针:php
因为 Worker 类的静态属性和子类对象的混用(当前类的静态属性存放当前类对象,静态方法循环静态属性遍历调用子类对象的方法),特别再加上 Master-Worker 进程组织模型,对一些刚接触的人来讲,很容易形成理解上的偏差,当形成理解上的混乱时,咱们须要明确:html
进程和类对象没有直接关系,进程内不论是静态方法、普通对象都属于该进程空间,静态属性也不会跨进程共享。当 fork 以后,咱们所阅读的代码已经被复制到了另外一个进程空间内(栈/堆/常量等),不要被那些 Worker 子类误导了。linux
另外,Master-Worker 进程模型中的 Worker 是子进程的表述,属于进程概念,跟 Worker 类不是一回事。web
入口
入口文件 start.php
,包含了 composer autoload,并执行下面的入口,建立相应的对象:编程
- start_web.php(WebServer)
- start_register.php(Register)
- start_gateway.php(Gateway)
- start_businessworker.php(BusinessWorker)
这 4 个子类其实都继承自 Worker
类(咱们暂且把它们叫作 “角色”):后端
看一下 Worker
的构造方法:api
public function __construct($socket_name = '', $context_option = array())
{
// Save all worker instances.
$this->workerId = spl_object_hash($this);
static::$_workers[$this->workerId] = $this;
static::$_pidMap[$this->workerId] = array();
// Get autoload root path.
$backtrace = debug_backtrace();
$this->_autoloadRootPath = dirname($backtrace[0]['file']);
// Context for socket.
if ($socket_name) {
$this->_socketName = $socket_name;
if (!isset($context_option['socket']['backlog'])) {
$context_option[
'socket']['backlog'] = static::DEFAULT_BACKLOG;
}
$this->_context = stream_context_create($context_option);
}
}
|
能够看到入口中建立对象时,实际上是放到了当前类的静态属性 $_workers
中,这里的 workerId
用 get_class($this)
也是同样的效果,为了方便表述,暂且把 workerId
叫作 “角色”。另外,子进程的 pid 在静态属性 $_pidMap
中是以子类名进行归类的。服务器
建立完对象,全部的子类对象都到了 $_workers
中,接下来就是执行静态方法 runAll
。网络
runAll
该静态方法把功能都拆了出去,子方法都为 protected static
方便 override 重写:session
public static function runAll()
{
static::checkSapiEnv();
static::init();
static::parseCommand();
static::daemonize();
static::initWorkers();
static::installSignal();
static::saveMasterPid();
static::displayUI();
static::forkWorkers();
static::resetStd();
static::monitorWorkers();
}
|
看一下每一个方法的大致功能:
checkSapiEnv()
主要是限制只能在 cli 模式下运行。
init()
完成一些初始化操做:指定 .pid 文件、建立日志文件、设置主进程状态为已开始、记录开始时间、设置主进程名称、调用静态方法 initId()
按照每一个角色的数量建立 $_idMap
静态属性(用来占位),最后经过 Timer::init()
安装一个 alarm
闹钟信号处理(定时器)。
parseCommand()
该方法用来解析命令行参数,start 并无作太多事情,只是判断了一下 .pid 文件是否存在,防止重复启动,并判断是否有 -d
肯定是否在后台执行。
daemonize()
该方法是让一个进程成为守护进程的核心,因为 linux 做为分时多用户系统,须要标记每一个登录的用户,因此须要 “会话” 的概念,当用户经过终端登录后,通常会存在如下几类进程:
看一下 daemonize()
的代码:
protected static function daemonize()
{
if (!static::$daemonize || static::$_OS !== 'linux') {
return;
}
umask(
0);
$pid = pcntl_fork();
if (-1 === $pid) {
throw new Exception('fork fail');
}
elseif ($pid > 0) {
exit(0);
}
if (-1 === posix_setsid()) {
throw new Exception("setsid fail");
}
// Fork again avoid SVR4 system regain the control of terminal.
$pid = pcntl_fork();
if (-1 === $pid) {
throw new Exception("fork fail");
}
elseif (0 !== $pid) {
exit(0);
}
}
|
该方法的核心是 posix_setsid()
,它能够建立一个会话,因为会话一般是独占终端的,致使当前会话期内的某个进程建立会话后没法 “抢到” 终端,进而变成后端进程,也就是咱们所说的 “守护进程”。
另外,在 php 中能够经过 pcntl
扩展的 pcntl_fork()
方法从当前进程 fork 一个新的进程,daemonize()
方法之因此须要在建立会话先后各 fork 一次,是因为:
- 组长进程没法建立会话,因此会在建立会话以前先 fork 一次,这样保证子进程必定不是组长,以后退出主进程,在子进程中建立会话
- 在子进程中建立会话以后,此时的子进程已是会话的首进程(session leader),为了不当前的 leader 再次链接终端,所以须要再 fork 一次以后退出主进程
通过这些步骤以后,获得的子进程即 Master 进程。
initWorkers()
首先遍历全部 workers 对象,对一些名称和权限作下判断,接着若是端口非复用($reusePort
属性),会直接调用对象方法 listen()
开启监听:
if (!$worker->reusePort) {
$worker->listen();
}
|
注意这里的 listen()
是循环出的 Worker
对象调用的,例如当前有 2 个 WebServer
、1 个 Register
、4 个 Gateway
、4 个 BusinessWorker
进程,若是端口是非复用,在 forkWorkers()
以前,只会建立 3 个 socket 监听(BusinessWorker
没有),这一点从端口号上就能够看出来(图片来自 WebSocket实战之————Workerman服务器的安装启动):
这些监听地址对应的是建立角色对象时的构造参数 $socket_name
(后面咱们遇到的内部监听地址基于 lanIp
属性)。
如今以 Gateway 为例,此时端口是非复用的,那为何 4 个 Gateway 进程依然能够监听同一个端口?事实上这 4 个 socket 是在 forkWorkers()
以后子进程从父进程的 Gateway
对象那里继承来的。
这种状况下,当监听的端口有客户端请求过来时,就会形成 “惊群” 现象(Linux网络编程“惊群”问题总结),即这 4 个 Gateway
进程都会被唤醒,但通过内核调度以后,只有一个进程可以处理请求,其他的失败后继续休眠。
而开启监听端口复用后,能够容许多个无亲缘关系的进程监听相同的端口,而且由系统内核作负载均衡,决定将 socket 链接交给哪一个进程处理,避免了惊群效应,能够提高多进程短链接应用的性能,listen()
中开启复用的代码:
if ($this->reusePort) {
stream_context_set_option(
$this->_context, 'socket', 'so_reuseport', 1);
}
|
这段代码实际上是在下面的 forkWorkers()
子进程中开启端口复用的状况下才会被调用;另外,开启端口复用须要 php 版本 >=7.0。
installSignal()
接下来就是为当前主进程安装信号处理,handler 的代码:
public static function signalHandler($signal)
{
switch ($signal) {
// Stop.
case SIGINT:
static::$_gracefulStop = false;
static::stopAll();
break;
// Graceful stop.
case SIGTERM:
static::$_gracefulStop = true;
static::stopAll();
break;
// Reload.
case SIGQUIT:
case SIGUSR1:
if($signal === SIGQUIT){
static::$_gracefulStop = true;
}
else{
static::$_gracefulStop = false;
}
static::$_pidsToRestart = static::getAllWorkerPids();
static::reload();
break;
// Show status.
case SIGUSR2:
static::writeStatisticsToStatusFile();
break;
// Show connection status.
case SIGIO:
static::writeConnectionsStatisticsToStatusFile();
break;
}
}
|
能够看到 SIGINT
/SIGTERM
都是中止,SIGQUIT
/SIGUSR1
为重启,SIGUSR2
为输出进程状态,SIGIO
输出链接状态。
saveMasterPid()
建立当前主进程的 .pid 文件。
displayUI()
输出命令行下的界面。
forkWorkers()
这里只讨论在 linux 下的状况,经过遍历 $_workers
全部的角色对象,判断 $_pidMap
中当前角色已经有的 pid 数量和角色对象 count
属性须要的工做进程数量,直到每一个角色都建立出 count
须要的数量。
看一下 linux 下建立工做进程的逻辑:
protected static function forkOneWorkerForLinux($worker)
{
// Get available worker id.
$id =
static::getId($worker->workerId, 0);
if ($id === false) {
return;
}
$pid = pcntl_fork();
// For master process.
if ($pid > 0) {
static::$_pidMap[$worker->workerId][$pid] = $pid;
static::$_idMap[$worker->workerId][$id] = $pid;
}
// For child processes.
elseif (0 === $pid) {
if ($worker->reusePort) {
$worker->listen();
}
if (static::$_status === static::STATUS_STARTING) {
static::resetStd();
}
static::$_pidMap = array();
static::$_workers = array($worker->workerId => $worker);
Timer::delAll();
static::setProcessTitle('WorkerMan: worker process ' . $worker->name . ' ' . $worker->getSocketName());
$worker->setUserAndGroup();
$worker->id = $id;
$worker->run();
$err =
new Exception('event-loop exited');
static::log($err);
exit(250);
}
else {
throw new Exception("forkOneWorker fail");
}
}
|
首先会调用 getId()
判断:
protected static function getId($worker_id, $pid)
{
return array_search($pid, static::$_idMap[$worker_id]);
}
|
在以前的静态方法 init()
中已经为每一个角色对象开辟了相应 count
数量的位置,这里判断 $_idMap
就是占座的过程;而且以后若是子进程退出,$_idMap
对应 id 的位置就会空缺,从新建立子进程就是填坑的过程。
回到 forkOneWorkerForLinux()
的逻辑,fork 以后,主进程记录下 pid 就结束了,子进程首先会判断端口是否可复用,可复用则调用 listen()
开启监听,不过须要注意的是,以前的 initWorkers()
中非复用状况下建立的是属于角色对象的 socket(3 个),而这里若是开启复用,是每一个子进程中的当前角色对象本身建立出来的 socket,而不是像非复用那样从主进程的相应对象继承;也就是说有几个子进程就建立几个 socket(前提是子进程对应角色对象建立时有 $socket_name
),请注意和非复用状况进行区分。
接着重定向 std 输出和错误,因为 $_pidMap
是提供给主进程监控用的,对子进程来讲没什么意义,因此直接清空;紧接着调用 Timer::delAll()
结束可能的闹钟信号,并清除全部可能存在的 event loop。
设置完当前进程的一些属性以后,调用 $worker->run()
子进程就正式执行它本身的逻辑了,run()
里面的流程咱们放到后面来解析,先接着原有的方法往下看。
resetStd()
重定向主进程的 STDOUT 和 STDERR,由于进程已经脱离了终端,输出可能致使一些未知的异常,因此须要重定向到 /dev/null
这台黑洞设备上;从上面的 forkWorkers()
能够看到,这个方法也会在子进程中使用。
monitorWorkers()
通过 forkWorkers()
以后,子进程已经使用定时器或者进入 event loop 处理本身的业务去了,所以这个方法只会在主进程内执行。
该方法的做用就是经过 wait
挂起,直到收到子进程退出信号以后,来决定是补上仍是直接退出,看一下监控逻辑:
protected static function monitorWorkersForLinux()
{
static::$_status = static::STATUS_RUNNING;
while (1) {
// Calls signal handlers for pending signals.
pcntl_signal_dispatch();
// Suspends execution of the current process until a child has exited, or until a signal is delivered
$status =
0;
$pid = pcntl_wait($status, WUNTRACED);
// Calls signal handlers for pending signals again.
pcntl_signal_dispatch();
// If a child has already exited.
if ($pid > 0) {
// Find out witch worker process exited.
foreach (static::$_pidMap as $worker_id => $worker_pid_array) {
if (isset($worker_pid_array[$pid])) {
$worker =
static::$_workers[$worker_id];
// Exit status.
if ($status !== 0) {
static::log("worker[" . $worker->name . ":$pid] exit with status $status");
}
// For Statistics.
if (!isset(static::$_globalStatistics['worker_exit_info'][$worker_id][$status])) {
static::$_globalStatistics['worker_exit_info'][$worker_id][$status] = 0;
}
static::$_globalStatistics['worker_exit_info'][$worker_id][$status]++;
// Clear process data.
unset(static::$_pidMap[$worker_id][$pid]);
// Mark id is available.
$id =
static::getId($worker_id, $pid);
static::$_idMap[$worker_id][$id] = 0;
break;
}
}
// Is still running state then fork a new worker process.
if (static::$_status !== static::STATUS_SHUTDOWN) {
static::forkWorkers();
// If reloading continue.
if (isset(static::$_pidsToRestart[$pid])) {
unset(static::$_pidsToRestart[$pid]);
static::reload();
}
}
else {
// If shutdown state and all child processes exited then master process exit.
if (!static::getAllWorkerPids()) {
static::exitAndClearAll();
}
}
}
else {
// If shutdown state and all child processes exited then master process exit.
if (static::$_status === static::STATUS_SHUTDOWN && !static::getAllWorkerPids()) {
static::exitAndClearAll();
}
}
}
}
|
这里先提一下 pcntl_signal_dispatch()
,该方法是为了去信号队列看一下有没有发送给当前进程的信号,并触发主进程的 signalHandler()
,固然咱们也可使用 declare(ticks)
来决定执行 ticks
行基础代码后自动调用 handler,但毕竟不是每次自动调用时都会有信号,因此咱们须要主动调用 pcntl_signal_dispatch()
来触发以减小性能损失。
回到 monitorWorkersForLinux()
的逻辑,该方法为一个死循环,核心方法为 pcntl_wait($status, WUNTRACED)
,经过该方法将主进程挂起,直到收到子进程退出的信号;引用类型的参数 $status
能够拿到 exit 值,好比代码里常常看到的 exit(250)
。
拿到退出的子进程号以后,将 $_pidMap
中相应的记录删除,并经过 pid 和 getId()
找到 id 以后把 $_idMap
中的占位腾出来(置为 0)。
紧接着判断当前进程状态,因为 shutdown 会标记主进程状态为 STATUS_SHUTDOWN
,因此若是是非结束状态,会继续 fork 工做进程来填补 $_idMap
以前腾出来的空缺,而且若是 $_pidsToRestart
中有此 pid,即表明该工做进程须要重启。
若是此时主进程状态被标记为 STATUS_SHUTDOWN
而且 getAllWorkerPids()
存在工做进程,则调用 exitAndClearAll()
来结束掉全部进程。
至此,主进程逻辑分析完毕。
listen()
该方法既有可能在主进程,也可能在子进程中调用,但无论调用时机如何,必定是在 run()
方法开始前调用的,因此仍是放到本文来分析,看一下代码:
public function listen()
{
if (!$this->_socketName) {
return;
}
// Autoload.
Autoloader::setRootPath(
$this->_autoloadRootPath);
if (!$this->_mainSocket) {
// Get the application layer communication protocol and listening address.
list($scheme, $address) = explode(':', $this->_socketName, 2);
// Check application layer protocol class.
if (!isset(static::$_builtinTransports[$scheme])) {
$scheme = ucfirst($scheme);
$this->protocol = '\\Protocols\\' . $scheme;
if (!class_exists($this->protocol)) {
$this->protocol = "\\Workerman\\Protocols\\$scheme";
if (!class_exists($this->protocol)) {
throw new Exception("class \\Protocols\\$scheme not exist");
}
}
if (!isset(static::$_builtinTransports[$this->transport])) {
throw new \Exception('Bad worker->transport ' . var_export($this->transport, true));
}
}
else {
$this->transport = $scheme;
}
$local_socket =
static::$_builtinTransports[$this->transport] . ":" . $address;
// Flag.
$flags =
$this->transport === 'udp' ? STREAM_SERVER_BIND : STREAM_SERVER_BIND | STREAM_SERVER_LISTEN;
$errno =
0;
$errmsg =
'';
// SO_REUSEPORT.
if ($this->reusePort) {
stream_context_set_option(
$this->_context, 'socket', 'so_reuseport', 1);
}
// Create an Internet or Unix domain server socket.
$this->_mainSocket = stream_socket_server($local_socket, $errno, $errmsg, $flags, $this->_context);
if (!$this->_mainSocket) {
throw new Exception($errmsg);
}
if ($this->transport === 'ssl') {
stream_socket_enable_crypto(
$this->_mainSocket, false);
}
elseif ($this->transport === 'unix') {
$socketFile = substr($address,
2);
if ($this->user) {
chown($socketFile,
$this->user);
}
if ($this->group) {
chgrp($socketFile,
$this->group);
}
}
// Try to open keepalive for tcp and disable Nagle algorithm.
if (function_exists('socket_import_stream') && static::$_builtinTransports[$this->transport] === 'tcp') {
$socket = socket_import_stream(
$this->_mainSocket);
@socket_set_option($socket, SOL_SOCKET, SO_KEEPALIVE,
1);
@socket_set_option($socket, SOL_TCP, TCP_NODELAY,
1);
}
// Non blocking.
stream_set_blocking(
$this->_mainSocket, 0);
}
$this->resumeAccept();
}
|
能够看到主要工做是根据建立对象时的 $socket_name
参数值,建立对应的 socket-server 放到当前对象的 $this->_mainSocket
属性中,再调用 resumeAccept()
方法:
public function resumeAccept()
{
// Register a listener to be notified when server socket is ready to read.
if (static::$globalEvent && true === $this->_pauseAccept && $this->_mainSocket) {
if ($this->transport !== 'udp') {
static::$globalEvent->add($this->_mainSocket, EventInterface::EV_READ, array($this, 'acceptConnection'));
}
else {
static::$globalEvent->add($this->_mainSocket, EventInterface::EV_READ,
array($this, 'acceptUdpConnection'));
}
$this->_pauseAccept = false;
}
}
|
能够看到该方法是向 event-loop 注册当前 $this->_mainSocket
的可读事件,即当有客户端链接上时,根据链接类型调用 acceptConnection()
或 acceptUdpConnection()
,因为本系列文章分析的是 todpole
游戏,因此这里暂且只分析 tcp 长连接:
public function acceptConnection($socket)
{
// Accept a connection on server socket.
$new_socket = @stream_socket_accept($socket,
0, $remote_address);
// Thundering herd.
if (!$new_socket) {
return;
}
// TcpConnection.
$connection =
new TcpConnection($new_socket, $remote_address);
$this->connections[$connection->id] = $connection;
$connection->worker =
$this;
$connection->protocol =
$this->protocol;
$connection->transport =
$this->transport;
$connection->onMessage =
$this->onMessage;
$connection->onClose =
$this->onClose;
$connection->onError =
$this->onError;
$connection->onBufferDrain =
$this->onBufferDrain;
$connection->onBufferFull =
$this->onBufferFull;
// Try to emit onConnect callback.
if ($this->onConnect) {
try {
call_user_func(
$this->onConnect, $connection);
}
catch (\Exception $e) {
static::log($e);
exit(250);
}
catch (\Error $e) {
static::log($e);
exit(250);
}
}
}
|
该方法主要工做是经过 stream_socket_accept()
包装一下以前的 socket-server,并针对每一个 socket-accept 建立一个 Connection 对象放到 $this->connections
属性中,而且这里遇到了第二个重要的回调 onConnect()
,该回调每一个链接事件只会触发一次。
ConnectionInterface
到了这里,咱们已经能够监听客户端的链接,可是当客户端上传数据时,应该如何处理呢?这时咱们须要沿着上面建立的 Connection 对象继续分析下去。
上面方法中的 TcpConnection
继承自 ConnectionInterface
(看名字觉得是个接口,实际上是个抽象类),看一下 TcpConnection
的构造方法:
public function __construct($socket, $remote_address = '')
{
self::$statistics['connection_count']++;
$this->id = $this->_id = self::$_idRecorder++;
if(self::$_idRecorder === PHP_INT_MAX){
self::$_idRecorder = 0;
}
$this->_socket = $socket;
stream_set_blocking(
$this->_socket, 0);
// Compatible with hhvm
if (function_exists('stream_set_read_buffer')) {
stream_set_read_buffer(
$this->_socket, 0);
}
Worker::$globalEvent->add(
$this->_socket, EventInterface::EV_READ, array($this, 'baseRead'));
$this->maxSendBufferSize = self::$defaultMaxSendBufferSize;
$this->_remoteAddress = $remote_address;
static::$connections[$this->id] = $this;
}
|
该方法中咱们找到了 socket-accept 的读取回调 baseRead()
,如今咱们知道数据的读取是在建立 Connection 对象时经过注册 event-loop 可读事件来触发的。
baseRead()
逻辑很长,这里就不贴出来了,不过在代码中找到了第三个重要的回调 onMessage()
,回顾一下 listen()
的主要工做,简单的归纳为:
- 向 event-loop 注册 socket-server 的可读事件,回调
acceptConnection()
经过stream_socket_accept()
获得 socket-accept 并建立 Connection 对象; - 建立 Connection 对象的构造方法中向 event-loop 注册 socket-accept 的可读事件,回调
baseRead()
中解析数据,并触发回调onMessage()
;
须要注意的是,event-loop 在子进程调用 run()
以前尚未开始循环,因此上面分析的回调逻辑只是注册,还不会触发。
本文结束,下一篇文章开始分析子进程的工做流程。