前面咱们跟着代码看了一遍workerman的初始化流程.但对于如何监听端口.等操做尚未具体的实现.咱们此次就来看一下.workerman是如何监听端口并运行的.php
在前面咱们初始化方法事后,就开始执行runAll方法.在runAll方法中,有以下几个方法.咱们会一个一个方法介绍.注意.在这里,全部的都是以static来运行的.因此,就是Worker类自己.linux
// 初始化环境变量.是否为linux. static::checkSapiEnv(); // 执行初始化流程 static::init(); // 锁定命令 static::lock(); // 解析命令行 static::parseCommand(); // 是否后台运行 static::daemonize(); // 初始化worker static::initWorkers(); // 安装信号处理 static::installSignal(); // 保存主进程ID static::saveMasterPid(); // 解锁 static::unlock(); // 展现ui static::displayUI(); // fork子进程 static::forkWorkers(); // 重定义输入输出 static::resetStd(); // 监控子进程. static::monitorWorkers();
检查当前命令行的环境.至于为何会有这个函数,主要是因为window下.只能开启一个进程.因此在不少地方都须要对于windows来进行特殊的处理.咱们来看看代码.ubuntu
// 若是不是cli运行,就直接提示. if (php_sapi_name() != "cli") { exit("only run in command line mode n"); } // 若是根据文件分隔符来判断,windows下为,linux为/.这是一种经常使用的方式. if (DIRECTORY_SEPARATOR === '') { self::$_OS = OS_TYPE_WINDOWS; }
这里主要是对错误处理,pid文件,和日志文件的初始化.还有初始化计时器windows
set_error_handler(function($code, $msg, $file, $line){ Worker::safeEcho("$msg in file $file on line $linen"); }); // 这里指定了启动文件.启动文件将用于后续的启动信息. $backtrace = debug_backtrace(); // startFile格式为全路径.例如在/var/www/study/https.php,那么startFile就为/var/www/study/https.php. static::$_startFile = $backtrace[count($backtrace) - 1]['file']; // 根据启动生成惟一的前缀. 根据startFile的全路径生成惟一的前缀. $unique_prefix = str_replace('/', '_', static::$_startFile); // 根据前缀生成pidFile.存放于vendor/workerman下面. if (empty(static::$pidFile)) { static::$pidFile = __DIR__ . "/../$unique_prefix.pid"; } // 生成日志文件. if (empty(static::$logFile)) { static::$logFile = __DIR__ . '/../workerman.log'; } $log_file = (string)static::$logFile; // 若是没有日志文件,就初始话日志文件. if (!is_file($log_file)) { touch($log_file); chmod($log_file, 0622); } // 初始化状态为启动中... static::$_status = static::STATUS_STARTING; // 设置启动时间和状态文件 static::$_globalStatistics['start_timestamp'] = time(); static::$_statisticsFile = sys_get_temp_dir() . "/$unique_prefix.status"; // 设置进程标题 static::setProcessTitle('WorkerMan: master process start_file=' . static::$_startFile); // 初始化worker的idmap. static::initId(); // 初始化计时器. Timer::init();
这里面其中最主要的是初始化worker的idmap,用于记录worker的对应关系. 这里主要存储子worker的pid信息.咱们来看下函数.api
protected static function initId() { // 循环全部的worker,并将全部的进程信息初始化赋值. foreach (static::$_workers as $worker_id => $worker){ $new_id_map = array(); $worker->count = $worker->count <= 0 ? 1 : $worker->count; for($key = 0; $key < $worker->count; $key++) { $new_id_map[$key] = isset(static::$_idMap[$worker_id][$key]) ? static::$_idMap[$worker_id][$key] : 0; } static::$_idMap[$worker_id] = $new_id_map; } }
初始化完成会造成,worker_id => 子进程信息.例如数组
array(1) { ["000000001364d94e00000000462a38e7"]=> array(4) { [0]=> int(0) [1]=> int(0) [2]=> int(0) [3]=> int(0) } }
这样的结构便于将id对应到对应的信息中.socket
protected static function lock() { // 打开锁定的启动文件. $fd = fopen(static::$_startFile, 'r'); // 若是文件没法打卡.则默认认为workerman已经启动了. if (!$fd || !flock($fd, LOCK_EX)) { static::log("Workerman[".static::$_startFile."] already running"); exit; } }
为什么须要锁定启动文件.这里会引起一个问题,若是同一时间启动worker.这样会致使没法预知的错误.因此在这里就先锁定启动文件.就保存的原子性操做.tcp
完成了文件锁定后,就开始解析咱们的命令了.因为这次信息较多.会单独写一个文章来解读.函数
daemonize是否运行在后台,若是咱们在命令行中使用了-d参数,就会执行此函数.可是咱们执行的命令行没有加上-d.因此,就不会执行此函数.咱们能够来看一下.oop
protected static function daemonize() { // 若是不是后台运行,若是不是linux.就直接返回. if (!static::$daemonize || static::$_OS !== OS_TYPE_LINUX) { return; } // 设定umask. umask(0); // 开始分叉,并返回pid.若是分叉失败,pid就为-1.若是成功pid就为0. $pid = pcntl_fork(); if (-1 === $pid) { throw new Exception('fork fail'); } elseif ($pid > 0) { exit(0); } // 将当前会话设置为进程组长,并管理其余子进程. 设置父进程先与子进程退出,子进程则会被1号进程收养,这个子进程就会成为init的子进程. if (-1 === posix_setsid()) { throw new Exception("setsid fail"); } // 再次分叉进程.以免SVR4系统从新得到终端的控制. $pid = pcntl_fork(); if (-1 === $pid) { throw new Exception("fork fail"); } elseif (0 !== $pid) { exit(0); } }
执行完后台运行事后,就开始初始化worker了.
protected static function initWorkers() { // 若是系统不是linux就直接返回. if (static::$_OS !== OS_TYPE_LINUX) { return; } // 开始初始化worker foreach (static::$_workers as $worker) { // 设置worker name. if (empty($worker->name)) { $worker->name = 'none'; } // 设置worker的user if (empty($worker->user)) { $worker->user = static::getCurrentUser(); } else { if (posix_getuid() !== 0 && $worker->user != static::getCurrentUser()) { static::log('Warning: You must have the root privileges to change uid and gid.'); } } // 设置socket名称. $worker->socket = $worker->getSocketName(); // 设置状态. $worker->status = '<g> [OK] </g>'; // 设置ui. foreach(static::getUiColumns() as $column_name => $prop){ !isset($worker->{$prop}) && $worker->{$prop}= 'NNNN'; $prop_length = strlen($worker->{$prop}); $key = '_max' . ucfirst(strtolower($column_name)) . 'NameLength'; static::$$key = max(static::$$key, $prop_length); } // 监听. if (!$worker->reusePort) { $worker->listen(); } }}
初始化完worker事后,就监听信号集了.在这里监听的信号集.信号集只能在linux中使用.
protected static function installSignal() { if (static::$_OS !== OS_TYPE_LINUX) { return; } // 进程关闭,Ctrl+c时触发 pcntl_signal(SIGINT, array('WorkermanWorker', 'signalHandler'), false); // 进程强制关闭,例如使用kill.等命令关闭进程时触发. pcntl_signal(SIGTERM, array('WorkermanWorker', 'signalHandler'), false); // 进程重启 pcntl_signal(SIGUSR1, array('WorkermanWorker', 'signalHandler'), false); // 进程退出 pcntl_signal(SIGQUIT, array('WorkermanWorker', 'signalHandler'), false); // 查看进程的状态 pcntl_signal(SIGUSR2, array('WorkermanWorker', 'signalHandler'), false); // 查看连接状态 pcntl_signal(SIGIO, array('WorkermanWorker', 'signalHandler'), false); // 屏蔽SIGPIPE消息 pcntl_signal(SIGPIPE, SIG_IGN, false); }
将主进程的ID保存到pidFile文件中.
protected static function saveMasterPid() { if (static::$_OS !== OS_TYPE_LINUX) { return; } static::$_masterPid = posix_getpid(); // 将主进程ID保存到pidFile中,pidFile在init时已经指定为vendor/workerman/下面的pid文件中去了. if (false === file_put_contents(static::$pidFile, static::$_masterPid)) { throw new Exception('can not save pid to ' . static::$pidFile); } }
当以上都执行完成事后就解除咱们启动文件的锁定状态.这个函数主要就是进行解锁操做.
protected static function unlock() { $fd = fopen(static::$_startFile, 'r'); $fd && flock($fd, LOCK_UN); }
显示ui.workerman在处理的时候,是先展现的ui.就是打印出相似于如下状况的信息
Workerman[http.php] start in DEBUG mode ----------------------------------------- WORKERMAN ----------------------------------------- Workerman version:3.5.22 PHP version:7.1.23-4+ubuntu18.04.1+deb.sury.org+1 ------------------------------------------ WORKERS ------------------------------------------ proto user worker listen processes status tcp adolph none http://0.0.0.0:2345 4 [OK] --------------------------------------------------------------------------------------------- Press Ctrl+C to stop. Start success.
因此,信息打印出来事后也不必定会正确的监听事件.下面咱们就继续查看.
以上的准备工做完成,就开始fork咱们的子进程了.并设置子进程的监听.
rotected static function forkWorkers() { if (static::$_OS === OS_TYPE_LINUX) { // 若是是linux就按照fork static::forkWorkersForLinux(); } else { // windows下监听 static::forkWorkersForWindows(); } }
根据平台来处理对应的子进程.咱们是在linux中,因此咱们就看一下static::forkWorkersForLinux这个函数.
protected static function forkWorkersForLinux() { // 根据全部的worker,咱们的状态信息在init中存放信息. foreach (static::$_workers as $worker) { if (static::$_status === static::STATUS_STARTING) { if (empty($worker->name)) { $worker->name = $worker->getSocketName(); } $worker_name_length = strlen($worker->name); if (static::$_maxWorkerNameLength < $worker_name_length) { static::$_maxWorkerNameLength = $worker_name_length; } } // 开始fork.从$_pidMap来获取信息并判断.这里的$_pidMap则是由workerId => 子进程的进程号组成的数组. // 而_pidMap则是在new Worker的时候将Worker直接存储进去的.因此在这里咱们就解决了在初始化流程的时候疑问了. while (count(static::$_pidMap[$worker->workerId]) < $worker->count) { // 开始fork static::forkOneWorkerForLinux($worker); } } }
static::forOneWorkerForLinux函数主要是来处理worker信息.这里咱们来看一下.
protected static function forkOneWorkerForLinux($worker) { // 开始获取id信息. 从idMap中获取 // array_search($pid, static::$_idMap[$worker_id]); // 因为咱们的idMap是由workerId=>子进程号组成的数组.因此,第一次查出来的id指定为0. $id = static::getId($worker->workerId, 0); if ($id === false) { return; } // 开始fork $pid = pcntl_fork(); // 若是pid大于0. 则是主进程. if ($pid > 0) { // 存到pidMap中, static::$_pidMap[$worker->workerId][$pid] = $pid; // 将idMap中的id换为进程号.这里的进程号就包含了主进程的信息. static::$_idMap[$worker->workerId][$id] = $pid; } elseif (0 === $pid) { // 若是是子进程. srand(); mt_srand(); // 开始监听事件. if ($worker->reusePort) { $worker->listen(); } // 若是状态为STATUS_STARTING,就重定向输入输出. if (static::$_status === static::STATUS_STARTING) { static::resetStd(); } // 从新初始化pid. 因此,在子进程中.子进程的pidMap为空. static::$_pidMap = array(); // 开始处理解除掉其余worker的监听.保证当前子进程中,只有一个worker来监听事件. foreach(static::$_workers as $key => $one_worker) { if ($one_worker->workerId !== $worker->workerId) { $one_worker->unlisten(); unset(static::$_workers[$key]); } } // 删除全部的定时器. Timer::delAll(); static::setProcessTitle('WorkerMan: worker process ' . $worker->name . ' ' . $worker->getSocketName()); // 设置用户和组. $worker->setUserAndGroup(); $worker->id = $id; // 执行run方法. $worker->run(); $err = new Exception('event-loop exited'); static::log($err); exit(250); } else { throw new Exception("forkOneWorker fail"); } }
上面代码若是在linux中运行,就已经分为主进程和子进程来进行处理了.在主进程中包含全部对子进程信息的保存.而子进程则在这里监听方法.须要注意一点的是,子进程中对全部父进程的变量是共享的.咱们来看看子进程是如何监听事件的.$worker->run();
public function run() { //更新状态为运行中. static::$_status = static::STATUS_RUNNING; // 注册关闭事件. register_shutdown_function(array("WorkermanWorker", 'checkErrors')); // 设置root的路径 Autoloader::setRootPath($this->_autoloadRootPath); // 建立事件监听 if (!static::$globalEvent) { $event_loop_class = static::getEventLoopName(); static::$globalEvent = new $event_loop_class; $this->resumeAccept(); } // 从新安装信号集. static::reinstallSignal(); // 初始化计时器 Timer::init(static::$globalEvent); // 设置消息时间. if (empty($this->onMessage)) { $this->onMessage = function () {}; } // 还原错误处理函数. restore_error_handler(); // 触发workerStart事件. if ($this->onWorkerStart) { try { call_user_func($this->onWorkerStart, $this); } catch (Exception $e) { static::log($e); // Avoid rapid infinite loop exit. sleep(1); exit(250); } catch (Error $e) { static::log($e); // Avoid rapid infinite loop exit. sleep(1); exit(250); } } // 等待事件触发. static::$globalEvent->loop(); }
run函数开始触发workerStart事件,并等待worker事件触发.因此这里的globalEvent就是咱们的事件处理函数.以上都是在子进程中完成的处理.子进程就会在这里中止.
当咱们fork完全部的子进程事后,就执行咱们的重定向输入输出.
public static function resetStd() { // 若是不是守护进程. if (!static::$daemonize || static::$_OS !== OS_TYPE_LINUX) { return; } global $STDOUT, $STDERR; // 打开输出的文件. $handle = fopen(static::$stdoutFile, "a"); if ($handle) { unset($handle); set_error_handler(function(){}); fclose($STDOUT); fclose($STDERR); fclose(STDOUT); fclose(STDERR); $STDOUT = fopen(static::$stdoutFile, "a"); $STDERR = fopen(static::$stdoutFile, "a"); // 从新定向到本身定义输出里面. static::outputStream($STDOUT); restoe_error_handler(); } else { throw new Exception('can not open stdoutFile ' . static::$stdoutFile); } }
最后,就是监听咱们的子进程信息.这里因为咱们的篇幅过长,就另起一张写入.
workerman对事件的监听 workerman对子进程的监控