workerman-todpole 执行流程(1)

该系列文章主要是完全扒一下 workerman todpole 游戏的实现原理。

提早打个预防针:php

因为 Worker 类的静态属性和子类对象的混用(当前类的静态属性存放当前类对象,静态方法循环静态属性遍历调用子类对象的方法),特别再加上 Master-Worker 进程组织模型,对一些刚接触的人来讲,很容易形成理解上的偏差,当形成理解上的混乱时,咱们须要明确:html

进程和类对象没有直接关系,进程内不论是静态方法、普通对象都属于该进程空间,静态属性也不会跨进程共享。当 fork 以后,咱们所阅读的代码已经被复制到了另外一个进程空间内(栈/堆/常量等),不要被那些 Worker 子类误导了。linux

另外,Master-Worker 进程模型中的 Worker 是子进程的表述,属于进程概念,跟 Worker 类不是一回事。web

入口

入口文件 start.php,包含了 composer autoload,并执行下面的入口,建立相应的对象:编程

  • start_web.php(WebServer)
  • start_register.php(Register)
  • start_gateway.php(Gateway)
  • start_businessworker.php(BusinessWorker)

这 4 个子类其实都继承自 Worker 类(咱们暂且把它们叫作 “角色”):后端

看一下 Worker 的构造方法:api

public function __construct($socket_name = '', $context_option = array())
{
// Save all worker instances.
$this->workerId = spl_object_hash($this);
static::$_workers[$this->workerId] = $this;
static::$_pidMap[$this->workerId] = array();
 
// Get autoload root path.
$backtrace = debug_backtrace();
$this->_autoloadRootPath = dirname($backtrace[0]['file']);
 
// Context for socket.
if ($socket_name) {
$this->_socketName = $socket_name;
if (!isset($context_option['socket']['backlog'])) {
$context_option[ 'socket']['backlog'] = static::DEFAULT_BACKLOG;
}
$this->_context = stream_context_create($context_option);
}
}

能够看到入口中建立对象时,实际上是放到了当前类的静态属性 $_workers 中,这里的 workerId 用 get_class($this) 也是同样的效果,为了方便表述,暂且把 workerId 叫作 “角色”。另外,子进程的 pid 在静态属性 $_pidMap 中是以子类名进行归类的。服务器

建立完对象,全部的子类对象都到了 $_workers 中,接下来就是执行静态方法 runAll网络

runAll

该静态方法把功能都拆了出去,子方法都为 protected static 方便 override 重写:session

public static function runAll()
{
static::checkSapiEnv();
static::init();
static::parseCommand();
static::daemonize();
static::initWorkers();
static::installSignal();
static::saveMasterPid();
static::displayUI();
static::forkWorkers();
static::resetStd();
static::monitorWorkers();
}

看一下每一个方法的大致功能:

checkSapiEnv()

主要是限制只能在 cli 模式下运行。

init()

完成一些初始化操做:指定 .pid 文件、建立日志文件、设置主进程状态为已开始、记录开始时间、设置主进程名称、调用静态方法 initId() 按照每一个角色的数量建立 $_idMap 静态属性(用来占位),最后经过 Timer::init() 安装一个 alarm 闹钟信号处理(定时器)。

parseCommand()

该方法用来解析命令行参数,start 并无作太多事情,只是判断了一下 .pid 文件是否存在,防止重复启动,并判断是否有 -d 肯定是否在后台执行。

daemonize()

该方法是让一个进程成为守护进程的核心,因为 linux 做为分时多用户系统,须要标记每一个登录的用户,因此须要 “会话” 的概念,当用户经过终端登录后,通常会存在如下几类进程:

看一下 daemonize() 的代码:

protected static function daemonize()
{
if (!static::$daemonize || static::$_OS !== 'linux') {
return;
}
umask( 0);
$pid = pcntl_fork();
if (-1 === $pid) {
throw new Exception('fork fail');
} elseif ($pid > 0) {
exit(0);
}
if (-1 === posix_setsid()) {
throw new Exception("setsid fail");
}
// Fork again avoid SVR4 system regain the control of terminal.
$pid = pcntl_fork();
if (-1 === $pid) {
throw new Exception("fork fail");
} elseif (0 !== $pid) {
exit(0);
}
}

该方法的核心是 posix_setsid(),它能够建立一个会话,因为会话一般是独占终端的,致使当前会话期内的某个进程建立会话后没法 “抢到” 终端,进而变成后端进程,也就是咱们所说的 “守护进程”。

另外,在 php 中能够经过 pcntl 扩展的 pcntl_fork() 方法从当前进程 fork 一个新的进程,daemonize() 方法之因此须要在建立会话先后各 fork 一次,是因为:

  • 组长进程没法建立会话,因此会在建立会话以前先 fork 一次,这样保证子进程必定不是组长,以后退出主进程,在子进程中建立会话
  • 在子进程中建立会话以后,此时的子进程已是会话的首进程(session leader),为了不当前的 leader 再次链接终端,所以须要再 fork 一次以后退出主进程

通过这些步骤以后,获得的子进程即 Master 进程。

initWorkers()

首先遍历全部 workers 对象,对一些名称和权限作下判断,接着若是端口非复用($reusePort 属性),会直接调用对象方法 listen() 开启监听:

if (!$worker->reusePort) {
$worker->listen();
}

注意这里的 listen() 是循环出的 Worker 对象调用的,例如当前有 2 个 WebServer、1 个 Register、4 个 Gateway、4 个 BusinessWorker 进程,若是端口是非复用,在 forkWorkers() 以前,只会建立 3 个 socket 监听(BusinessWorker 没有),这一点从端口号上就能够看出来(图片来自 WebSocket实战之————Workerman服务器的安装启动):

这些监听地址对应的是建立角色对象时的构造参数 $socket_name(后面咱们遇到的内部监听地址基于 lanIp 属性)。

如今以 Gateway 为例,此时端口是非复用的,那为何 4 个 Gateway 进程依然能够监听同一个端口?事实上这 4 个 socket 是在 forkWorkers() 以后子进程从父进程的 Gateway 对象那里继承来的。

这种状况下,当监听的端口有客户端请求过来时,就会形成 “惊群” 现象(Linux网络编程“惊群”问题总结),即这 4 个 Gateway 进程都会被唤醒,但通过内核调度以后,只有一个进程可以处理请求,其他的失败后继续休眠。

而开启监听端口复用后,能够容许多个无亲缘关系的进程监听相同的端口,而且由系统内核作负载均衡,决定将 socket 链接交给哪一个进程处理,避免了惊群效应,能够提高多进程短链接应用的性能,listen() 中开启复用的代码:

if ($this->reusePort) {
stream_context_set_option( $this->_context, 'socket', 'so_reuseport', 1);
}

这段代码实际上是在下面的 forkWorkers() 子进程中开启端口复用的状况下才会被调用;另外,开启端口复用须要 php 版本 >=7.0。

installSignal()

接下来就是为当前主进程安装信号处理,handler 的代码:

public static function signalHandler($signal)
{
switch ($signal) {
// Stop.
case SIGINT:
static::$_gracefulStop = false;
static::stopAll();
break;
// Graceful stop.
case SIGTERM:
static::$_gracefulStop = true;
static::stopAll();
break;
// Reload.
case SIGQUIT:
case SIGUSR1:
if($signal === SIGQUIT){
static::$_gracefulStop = true;
} else{
static::$_gracefulStop = false;
}
static::$_pidsToRestart = static::getAllWorkerPids();
static::reload();
break;
// Show status.
case SIGUSR2:
static::writeStatisticsToStatusFile();
break;
// Show connection status.
case SIGIO:
static::writeConnectionsStatisticsToStatusFile();
break;
}
}

能够看到 SIGINT/SIGTERM 都是中止,SIGQUIT/SIGUSR1 为重启,SIGUSR2 为输出进程状态,SIGIO输出链接状态。

saveMasterPid()

建立当前主进程的 .pid 文件。

displayUI()

输出命令行下的界面。

forkWorkers()

这里只讨论在 linux 下的状况,经过遍历 $_workers 全部的角色对象,判断 $_pidMap 中当前角色已经有的 pid 数量和角色对象 count 属性须要的工做进程数量,直到每一个角色都建立出 count 须要的数量。

看一下 linux 下建立工做进程的逻辑:

protected static function forkOneWorkerForLinux($worker)
{
// Get available worker id.
$id = static::getId($worker->workerId, 0);
if ($id === false) {
return;
}
$pid = pcntl_fork();
// For master process.
if ($pid > 0) {
static::$_pidMap[$worker->workerId][$pid] = $pid;
static::$_idMap[$worker->workerId][$id] = $pid;
} // For child processes.
elseif (0 === $pid) {
if ($worker->reusePort) {
$worker->listen();
}
if (static::$_status === static::STATUS_STARTING) {
static::resetStd();
}
static::$_pidMap = array();
static::$_workers = array($worker->workerId => $worker);
Timer::delAll();
static::setProcessTitle('WorkerMan: worker process ' . $worker->name . ' ' . $worker->getSocketName());
$worker->setUserAndGroup();
$worker->id = $id;
$worker->run();
$err = new Exception('event-loop exited');
static::log($err);
exit(250);
} else {
throw new Exception("forkOneWorker fail");
}
}

首先会调用 getId() 判断:

protected static function getId($worker_id, $pid)
{
return array_search($pid, static::$_idMap[$worker_id]);
}

在以前的静态方法 init() 中已经为每一个角色对象开辟了相应 count 数量的位置,这里判断 $_idMap就是占座的过程;而且以后若是子进程退出,$_idMap 对应 id 的位置就会空缺,从新建立子进程就是填坑的过程。

回到 forkOneWorkerForLinux() 的逻辑,fork 以后,主进程记录下 pid 就结束了,子进程首先会判断端口是否可复用,可复用则调用 listen() 开启监听,不过须要注意的是,以前的 initWorkers() 中非复用状况下建立的是属于角色对象的 socket(3 个),而这里若是开启复用,是每一个子进程中的当前角色对象本身建立出来的 socket,而不是像非复用那样从主进程的相应对象继承;也就是说有几个子进程就建立几个 socket(前提是子进程对应角色对象建立时有 $socket_name),请注意和非复用状况进行区分。

接着重定向 std 输出和错误,因为 $_pidMap 是提供给主进程监控用的,对子进程来讲没什么意义,因此直接清空;紧接着调用 Timer::delAll() 结束可能的闹钟信号,并清除全部可能存在的 event loop。

设置完当前进程的一些属性以后,调用 $worker->run() 子进程就正式执行它本身的逻辑了,run() 里面的流程咱们放到后面来解析,先接着原有的方法往下看。

resetStd()

重定向主进程的 STDOUT 和 STDERR,由于进程已经脱离了终端,输出可能致使一些未知的异常,因此须要重定向到 /dev/null 这台黑洞设备上;从上面的 forkWorkers() 能够看到,这个方法也会在子进程中使用。

monitorWorkers()

通过 forkWorkers() 以后,子进程已经使用定时器或者进入 event loop 处理本身的业务去了,所以这个方法只会在主进程内执行。

该方法的做用就是经过 wait 挂起,直到收到子进程退出信号以后,来决定是补上仍是直接退出,看一下监控逻辑:

protected static function monitorWorkersForLinux()
{
static::$_status = static::STATUS_RUNNING;
while (1) {
// Calls signal handlers for pending signals.
pcntl_signal_dispatch();
 
// Suspends execution of the current process until a child has exited, or until a signal is delivered
$status = 0;
$pid = pcntl_wait($status, WUNTRACED);
 
// Calls signal handlers for pending signals again.
pcntl_signal_dispatch();
// If a child has already exited.
if ($pid > 0) {
// Find out witch worker process exited.
foreach (static::$_pidMap as $worker_id => $worker_pid_array) {
if (isset($worker_pid_array[$pid])) {
$worker = static::$_workers[$worker_id];
// Exit status.
if ($status !== 0) {
static::log("worker[" . $worker->name . ":$pid] exit with status $status");
}
 
// For Statistics.
if (!isset(static::$_globalStatistics['worker_exit_info'][$worker_id][$status])) {
static::$_globalStatistics['worker_exit_info'][$worker_id][$status] = 0;
}
static::$_globalStatistics['worker_exit_info'][$worker_id][$status]++;
 
// Clear process data.
unset(static::$_pidMap[$worker_id][$pid]);
 
// Mark id is available.
$id = static::getId($worker_id, $pid);
static::$_idMap[$worker_id][$id] = 0;
 
break;
}
}
// Is still running state then fork a new worker process.
if (static::$_status !== static::STATUS_SHUTDOWN) {
static::forkWorkers();
// If reloading continue.
if (isset(static::$_pidsToRestart[$pid])) {
unset(static::$_pidsToRestart[$pid]);
static::reload();
}
} else {
// If shutdown state and all child processes exited then master process exit.
if (!static::getAllWorkerPids()) {
static::exitAndClearAll();
}
}
} else {
// If shutdown state and all child processes exited then master process exit.
if (static::$_status === static::STATUS_SHUTDOWN && !static::getAllWorkerPids()) {
static::exitAndClearAll();
}
}
}
}

这里先提一下 pcntl_signal_dispatch(),该方法是为了去信号队列看一下有没有发送给当前进程的信号,并触发主进程的 signalHandler(),固然咱们也可使用 declare(ticks) 来决定执行 ticks 行基础代码后自动调用 handler,但毕竟不是每次自动调用时都会有信号,因此咱们须要主动调用 pcntl_signal_dispatch() 来触发以减小性能损失。

回到 monitorWorkersForLinux() 的逻辑,该方法为一个死循环,核心方法为 pcntl_wait($status, WUNTRACED),经过该方法将主进程挂起,直到收到子进程退出的信号;引用类型的参数 $status 能够拿到 exit 值,好比代码里常常看到的 exit(250)

拿到退出的子进程号以后,将 $_pidMap 中相应的记录删除,并经过 pid 和 getId() 找到 id 以后把 $_idMap 中的占位腾出来(置为 0)。

紧接着判断当前进程状态,因为 shutdown 会标记主进程状态为 STATUS_SHUTDOWN,因此若是是非结束状态,会继续 fork 工做进程来填补 $_idMap 以前腾出来的空缺,而且若是 $_pidsToRestart 中有此 pid,即表明该工做进程须要重启。

若是此时主进程状态被标记为 STATUS_SHUTDOWN 而且 getAllWorkerPids() 存在工做进程,则调用 exitAndClearAll() 来结束掉全部进程。

至此,主进程逻辑分析完毕。

listen()

该方法既有可能在主进程,也可能在子进程中调用,但无论调用时机如何,必定是在 run() 方法开始前调用的,因此仍是放到本文来分析,看一下代码:

public function listen()
{
if (!$this->_socketName) {
return;
}
 
// Autoload.
Autoloader::setRootPath( $this->_autoloadRootPath);
 
if (!$this->_mainSocket) {
// Get the application layer communication protocol and listening address.
list($scheme, $address) = explode(':', $this->_socketName, 2);
// Check application layer protocol class.
if (!isset(static::$_builtinTransports[$scheme])) {
$scheme = ucfirst($scheme);
$this->protocol = '\\Protocols\\' . $scheme;
if (!class_exists($this->protocol)) {
$this->protocol = "\\Workerman\\Protocols\\$scheme";
if (!class_exists($this->protocol)) {
throw new Exception("class \\Protocols\\$scheme not exist");
}
}
 
if (!isset(static::$_builtinTransports[$this->transport])) {
throw new \Exception('Bad worker->transport ' . var_export($this->transport, true));
}
} else {
$this->transport = $scheme;
}
 
$local_socket = static::$_builtinTransports[$this->transport] . ":" . $address;
 
// Flag.
$flags = $this->transport === 'udp' ? STREAM_SERVER_BIND : STREAM_SERVER_BIND | STREAM_SERVER_LISTEN;
$errno = 0;
$errmsg = '';
// SO_REUSEPORT.
if ($this->reusePort) {
stream_context_set_option( $this->_context, 'socket', 'so_reuseport', 1);
}
 
// Create an Internet or Unix domain server socket.
$this->_mainSocket = stream_socket_server($local_socket, $errno, $errmsg, $flags, $this->_context);
if (!$this->_mainSocket) {
throw new Exception($errmsg);
}
 
if ($this->transport === 'ssl') {
stream_socket_enable_crypto( $this->_mainSocket, false);
} elseif ($this->transport === 'unix') {
$socketFile = substr($address, 2);
if ($this->user) {
chown($socketFile, $this->user);
}
if ($this->group) {
chgrp($socketFile, $this->group);
}
}
 
// Try to open keepalive for tcp and disable Nagle algorithm.
if (function_exists('socket_import_stream') && static::$_builtinTransports[$this->transport] === 'tcp') {
$socket = socket_import_stream( $this->_mainSocket);
@socket_set_option($socket, SOL_SOCKET, SO_KEEPALIVE, 1);
@socket_set_option($socket, SOL_TCP, TCP_NODELAY, 1);
}
 
// Non blocking.
stream_set_blocking( $this->_mainSocket, 0);
}
 
$this->resumeAccept();
}

能够看到主要工做是根据建立对象时的 $socket_name 参数值,建立对应的 socket-server 放到当前对象的 $this->_mainSocket 属性中,再调用 resumeAccept() 方法:

public function resumeAccept()
{
// Register a listener to be notified when server socket is ready to read.
if (static::$globalEvent && true === $this->_pauseAccept && $this->_mainSocket) {
if ($this->transport !== 'udp') {
static::$globalEvent->add($this->_mainSocket, EventInterface::EV_READ, array($this, 'acceptConnection'));
} else {
static::$globalEvent->add($this->_mainSocket, EventInterface::EV_READ,
array($this, 'acceptUdpConnection'));
}
$this->_pauseAccept = false;
}
}

能够看到该方法是向 event-loop 注册当前 $this->_mainSocket 的可读事件,即当有客户端链接上时,根据链接类型调用 acceptConnection() 或 acceptUdpConnection(),因为本系列文章分析的是 todpole游戏,因此这里暂且只分析 tcp 长连接:

public function acceptConnection($socket)
{
// Accept a connection on server socket.
$new_socket = @stream_socket_accept($socket, 0, $remote_address);
// Thundering herd.
if (!$new_socket) {
return;
}
 
// TcpConnection.
$connection = new TcpConnection($new_socket, $remote_address);
$this->connections[$connection->id] = $connection;
$connection->worker = $this;
$connection->protocol = $this->protocol;
$connection->transport = $this->transport;
$connection->onMessage = $this->onMessage;
$connection->onClose = $this->onClose;
$connection->onError = $this->onError;
$connection->onBufferDrain = $this->onBufferDrain;
$connection->onBufferFull = $this->onBufferFull;
 
// Try to emit onConnect callback.
if ($this->onConnect) {
try {
call_user_func( $this->onConnect, $connection);
} catch (\Exception $e) {
static::log($e);
exit(250);
} catch (\Error $e) {
static::log($e);
exit(250);
}
}
}

该方法主要工做是经过 stream_socket_accept() 包装一下以前的 socket-server,并针对每一个 socket-accept 建立一个 Connection 对象放到 $this->connections 属性中,而且这里遇到了第二个重要的回调 onConnect(),该回调每一个链接事件只会触发一次。

ConnectionInterface

到了这里,咱们已经能够监听客户端的链接,可是当客户端上传数据时,应该如何处理呢?这时咱们须要沿着上面建立的 Connection 对象继续分析下去。

上面方法中的 TcpConnection 继承自 ConnectionInterface(看名字觉得是个接口,实际上是个抽象类),看一下 TcpConnection 的构造方法:

public function __construct($socket, $remote_address = '')
{
self::$statistics['connection_count']++;
$this->id = $this->_id = self::$_idRecorder++;
if(self::$_idRecorder === PHP_INT_MAX){
self::$_idRecorder = 0;
}
$this->_socket = $socket;
stream_set_blocking( $this->_socket, 0);
// Compatible with hhvm
if (function_exists('stream_set_read_buffer')) {
stream_set_read_buffer( $this->_socket, 0);
}
Worker::$globalEvent->add( $this->_socket, EventInterface::EV_READ, array($this, 'baseRead'));
$this->maxSendBufferSize = self::$defaultMaxSendBufferSize;
$this->_remoteAddress = $remote_address;
static::$connections[$this->id] = $this;
}

该方法中咱们找到了 socket-accept 的读取回调 baseRead(),如今咱们知道数据的读取是在建立 Connection 对象时经过注册 event-loop 可读事件来触发的。

baseRead() 逻辑很长,这里就不贴出来了,不过在代码中找到了第三个重要的回调 onMessage(),回顾一下 listen() 的主要工做,简单的归纳为:

  1. 向 event-loop 注册 socket-server 的可读事件,回调 acceptConnection() 经过 stream_socket_accept() 获得 socket-accept 并建立 Connection 对象;
  2. 建立 Connection 对象的构造方法中向 event-loop 注册 socket-accept 的可读事件,回调 baseRead()中解析数据,并触发回调 onMessage()

须要注意的是,event-loop 在子进程调用 run() 以前尚未开始循环,因此上面分析的回调逻辑只是注册,还不会触发。

本文结束,下一篇文章开始分析子进程的工做流程。

相关文章
相关标签/搜索