在咱们启动了workerman事后,按照咱们前面的理解.若是是在linux下.worker子进程启动端口复用,并监听事件和处理事件(win忽略).那workerman是如何对事件完成监听和处理的呢.咱们来看一下.linux
在咱们看源码的时候,在forkOneWorkerForLinux有这样一行代码.算法
if ($worker->reusePort) { $worker->listen();}
这里,一看名字就知道是对端口进行监听.咱们来看一下源码swoole
public function listen() { // 这里的socketName就是咱们监听的地址信息. if (!$this->_socketName) { return; } // 设置自动加载的信息. Autoloader::setRootPath($this->_autoloadRootPath); // 须要监听的socket. if (!$this->_mainSocket) { // 解析socket地址. $local_socket = $this->parseSocketAddress(); // 设置基础协议.若是是udp.就为STREAM_SERVER_BIND,若是不是udp就是,就是4 | 8.注意,这里是按位或算法.则为 100(4) | 1000(8) = 1100. $flags = $this->transport === 'udp' ? STREAM_SERVER_BIND : STREAM_SERVER_BIND | STREAM_SERVER_LISTEN; $errno = 0; $errmsg = ''; // 设置端口复用. if ($this->reusePort) { \stream_context_set_option($this->\context, 'socket', 'so_reuseport', 1); } // 开始建立socket. $this->_mainSocket = \stream_socket_server($local_socket, $errno, $errmsg, $flags, $this->_context); if (!$this->_mainSocket) { throw new Exception($errmsg); } // 若是transport为ssl,就要特殊处理. if ($this->transport === 'ssl') { \stream_socket_enable_crypto($this->_mainSocket, false); } elseif ($this->transport === 'unix') { // 若是是unix. $socket_file = \substr($local_socket, 7); if ($this->user) { chown($socket_file, $this->user); } if ($this->group) { chgrp($socket_file, $this->group); } } // 尝试打开tcp的keepalive并禁用掉Nagle算法. if (\function_exists('socket_import_stream') && static::$_builtinTransports[$this->transport] === 'tcp') { \set_error_handler(function(){}); $socket = \socket_import_stream($this->_mainSocket); \socket_set_option($socket, SOL_SOCKET, SO_KEEPALIVE, 1); \socket_set_option($socket, SOL_TCP, TCP_NODELAY, 1); \restore_error_handler(); } // 为资源流设置为非阻塞模式. \stream_set_blocking($this->_mainSocket, 0); } // 设置监听. $this->resumeAccept();}
在这里,workerman首先是建立了一个_mainSocket的socket监听信息.并设置为非阻塞模式.并设置监听.socket
public function resumeAccept() { // 设置事件监听. if (static::$globalEvent && true === $this->_pauseAccept && $this->_mainSocket) { // 若是不是udp协议.就采用acceptConnection来才听. if ($this->transport !== 'udp') { static::$globalEvent->add($this->_mainSocket, EventInterface::EV_READ, array($this, 'acceptConnection')); } else { // udp协议,就采用acceptUdpConnection来监听 static::$globalEvent->add($this->_mainSocket, EventInterface::EV_READ, array($this, 'acceptUdpConnection')); } // 设置pauseAccept为false. $this->_pauseAccept = false; } }
咱们来看看acceptConnection.tcp
public function acceptConnection($socket) { // 设置错误信息. \set_error_handler(function(){}); // 接收信息. $new_socket = stream_socket_accept($socket, 0, $remote_address); // 恢复错误信息. \restore_error_handler(); // 若是没有接收成功. if (!$new_socket) { return; } // 底层采用Tcp来监听. $connection = new TcpConnection($new_socket, $remote_address); // 设置connections来监听. $this->connections[$connection->id] = $connection; // 把worker复制进去. $connection->worker = $this; $connection->protocol = $this->protocol; $connection->transport = $this->transport; // 设置事件监听. $connection->onMessage = $this->onMessage; // 关闭连接时的处理. $connection->onClose = $this->onClose; // 设置错误信息. $connection->onError = $this->onError; $connection->onBufferDrain = $this->onBufferDrain; $connection->onBufferFull = $this->onBufferFull; // 若是有onConnect链接处理.就直接调用. if ($this->onConnect) { try { \call_user_func($this->onConnect, $connection); } catch (\Exception $e) { static::log($e); exit(250); } catch (\Error $e) { static::log($e); exit(250); } } }
这里,咱们就完成了对事件的监听.而且每一个连接下面都将worker的信息赋值给TcpConnection的worker中.因此,每个Connection都会有worker信息.完成了事件的监听.那如何处理事件的呢.oop
咱们在worker的run方法中,有这样一句话.ui
static::$globalEvent->loop();
这里采用了死循环来等待来处理事件.咱们在前面看了.$globalEvent是采用workerman本身自己的来监听.它是在run方法中初始化的.this
// run if (!static::$globalEvent) { $event_loop_class = static::getEventLoopName(); static::$globalEvent = new $event_loop_class; $this->resumeAccept(); } // Worker::getEventLoopName() protected static function getEventLoopName() { // 若是存在就直接返回. if (static::$eventLoopClass) { return static::$eventLoopClass; } // 若是存在就删除Swoole\Event. if (!\class_exists('\Swoole\Event', false)) { unset(static::$_availableEventLoops['swoole']); } $loop_name = ''; // protected static $\_availableEventLoops = array( // 'libevent' => '\\Workerman\\Events\\Libevent', // 'event' => '\\Workerman\\Events\\Event' // 'swoole' => '\\Workerman\\Events\\Swoole' // ); foreach (static::$\_availableEventLoops as $name=>$class) { if (\extension_loaded($name)) { $loop_name = $name; break; } } if ($loop\_name) { // 若是存在名称.就使用React下面的事件处理. if (\interface_exists('\React\EventLoop\LoopInterface')) { switch ($loop_name) { case 'libevent': static::$eventLoopClass = '\Workerman\Events\React\ExtLibEventLoop'; break; case 'event': static::$eventLoopClass = '\Workerman\Events\React\ExtEventLoop'; break; default : static::$eventLoopClass = '\Workerman\Events\React\StreamSelectLoop'; break; } } else { // 就默认. 咱们这里采用的Libevent. static::$eventLoopClass = static::$\_availableEventLoops[$loop_name]; } } else { static::$eventLoopClass = \interface_exists('\React\EventLoop\LoopInterface')? '\Workerman\Events\React\StreamSelectLoop':'\Workerman\Events\Select'; } return static::$eventLoopClass; }
以上就是初始化$globalEvent.我当前的环境是libevent.因此咱们就看一下.WorkermanEventsLibevent的loop方法.unix
public function loop() { // 调用了一个event_base_loop. \event_base_loop($this->_eventBase); }
只有一个方法.event_base_loop就等待事件被触发,而后触发他们的事件.这里的是$this->_eventBase.而设置事件则在咱们的add方法中.在listen的时候,咱们调用了以下的方法.rest
static::$globalEvent->add($this->_mainSocket, EventInterface::EV_READ, array($this, 'acceptConnection'));
因此咱们来看一下add.
public function add($fd, $flag, $func, $args = array()) { // 判断信息. switch ($flag) { case self::EV_SIGNAL: $fd_key = (int)$fd; $real_flag = EV_SIGNAL | EV_PERSIST; $this->_eventSignal[$fd_key] = event_new(); if (!\event_set($this->_eventSignal[$fd_key], $fd, $real_flag, $func, null)) { return false; } if (!\event_base_set($this->_eventSignal[$fd_key], $this->_eventBase)) { return false; } if (!\event_add($this->_eventSignal[$fd_key])) { return false; } return true; case self::EV_TIMER: case self::EV_TIMER_ONCE: $event = \event_new(); $timer_id = (int)$event; if (!\event_set($event, 0, EV_TIMEOUT, array($this, 'timerCallback'), $timer_id)) { return false; } if (!\event_base_set($event, $this->_eventBase)) { return false; } $time_interval = $fd * 1000000; if (!\event_add($event, $time_interval)) { return false; } $this->_eventTimer[$timer_id] = array($func, (array)$args, $event, $flag, $time_interval); return $timer_id; default : $fd_key = (int)$fd; $real_flag = $flag === self::EV_READ ? EV_READ | EV_PERSIST : EV_WRITE | EV_PERSIST; $event = \event_new(); if (!\event_set($event, $fd, $real_flag, $func, null)) { return false; } if (!\event_base_set($event, $this->_eventBase)) { return false; } if (!\event_add($event)) { return false; } $this->_allEvents[$fd_key][$flag] = $event; return true; } }
咱们从以上方法能够精简出大概的几个语句
// 建立一个新的事件. $event = \event_new(); // 设置事件的监听. if (!\event_set($event, $fd, $real_flag, $func, null)) { return false; } // 从新设置event的绑定事件. if (!\event_base_set($event, $this->_eventBase)) { return false; } // 添加事件. if (!\event_add($event)) { return false; } // 保存到全局的事件中去. $this->_allEvents[$fd_key][$flag] = $event;
因此.咱们在run方法中,先设置事件的监听.而后在调用事件的处理.而完成事件的处理则是由咱们worker自带的onMessage方法来处理.
event_set和event_base_set的做用. workerman 对 worker的监控.