PHP Socket 深度探索 (一)

简介

Socket(套接字)一直是网络层的底层核心内容,也是 TCP/IP 以及 UDP 底层协议的实现通道。随着互联网信息时代的爆炸式发展,当代服务器的性能问题面临愈来愈大的挑战,著名的 C10K 问题(http://www.kegel.com/c10k.html)也随之出现。幸好经过大牛们的不懈努力,区别于传统的 select/poll 的 epoll/kqueue 方式出现了,目前 linux2.6 以上的内核都广泛支持,这是 Socket 领域一项巨大的进步,不只解决了 C10K 问题,也渐渐成为了当代互联网的底层核心技术。libevent 库就是其中一个比较出彩的项目(如今很是多的开源项目都有用到,包括 Memcached),感兴趣的朋友能够研究一下。php

因为网络上系统介绍这个部分的文章并很少,而涉及 PHP 的就更少了,因此石头君在这里但愿经过《Socket深度探究4PHP》这个系列给对这个领域感兴趣的读者们必定的帮助,也但愿你们能和我一块儿对这个问题进行更深刻的探讨。首先,解释一下目前 Socket 领域比较易于混淆的概念有:阻塞/非阻塞、同步/异步、多路复用等。html

阅读准备

一、阻塞/非阻塞:这两个概念是针对 IO 过程当中进程的状态来讲的,阻塞 IO 是指调用结果返回以前,当前线程会被挂起;相反,非阻塞指在不能马上获得结果以前,该函数不会阻塞当前线程,而会马上返回。linux

二、同步/异步:这两个概念是针对调用若是返回结果来讲的,所谓同步,就是在发出一个功能调用时,在没有获得结果以前,该调用就不返回;相反,当一个异步过程调用发出后,调用者不能马上获得结果,实际处理这个调用的部件在完成后,经过状态、通知和回调来通知调用者。编程

三、多路复用(IO/Multiplexing):为了提升数据信息在网络通讯线路中传输的效率,在一条物理通讯线路上创建多条逻辑通讯信道,同时传输若干路信号的技术就叫作多路复用技术。对于 Socket 来讲,应该说能同时处理多个链接的模型都应该被称为多路复用,目前比较经常使用的有 select/poll/epoll/kqueue 这些 IO 模型(目前也有像 Apache 这种每一个链接用单独的进程/线程来处理的 IO 模型,可是效率相对比较差,也很容易出问题,因此暂时不作介绍了)。在这些多路复用的模式中,异步阻塞/非阻塞模式的扩展性和性能最好。缓存

同步阻塞IO模型

socket_server.php服务器

<?php  
/** 
 * SocketServer Class 
 * By James.Huang <shagoo#gmail.com> 
**/  
set_time_limit(0);  
class SocketServer   
{  
    private static $socket;  
    function SocketServer($port)   
    {  
        global $errno, $errstr;  
        if ($port < 1024) {  
            die("Port must be a number which bigger than 1024/n");  
        }  
          
        $socket = stream_socket_server("tcp://0.0.0.0:{$port}", $errno, $errstr);  
        if (!$socket) die("$errstr ($errno)");  
          
//      stream_set_timeout($socket, -1); // 保证服务端 socket 不会超时,彷佛没用:)  
          
        while ($conn = stream_socket_accept($socket, -1)) { // 这样设置不超时才油用  
            static $id = 0;  
            static $ct = 0;  
            $ct_last = $ct;  
            $ct_data = '';  
            $buffer = '';  
            $id++; // increase on each accept  
            echo "Client $id come./n";  
            while (!preg_match('//r?/n/', $buffer)) { // 没有读到结束符,继续读  
//              if (feof($conn)) break; // 防止 popen 和 fread 的 bug 致使的死循环  
                $buffer = fread($conn, 1024);  
                echo 'R'; // 打印读的次数  
                $ct += strlen($buffer);  
                $ct_data .= preg_replace('//r?/n/', '', $buffer);  
            }  
            $ct_size = ($ct - $ct_last) * 8;  
            echo "[$id] " . __METHOD__ . " > " . $ct_data . "/n";  
            fwrite($conn, "Received $ct_size byte data./r/n");  
            fclose($conn);  
        }  
          
        fclose($socket);  
    }  
}  
new SocketServer(2000);

socket_client.php网络

<?php  
/** 
 * Socket Test Client 
 * By James.Huang <shagoo#gmail.com> 
**/  
function debug ($msg)  
{  
//  echo $msg;  
    error_log($msg, 3, '/tmp/socket.log');  
}  
if ($argv[1]) {  
      
    $socket_client = stream_socket_client('tcp://0.0.0.0:2000', $errno, $errstr, 30);  
      
//  stream_set_blocking($socket_client, 0);  
//  stream_set_timeout($socket_client, 0, 100000);  
      
    if (!$socket_client) {  
        die("$errstr ($errno)");  
    } else {  
        $msg = trim($argv[1]);  
        for ($i = 0; $i < 10; $i++) {  
            $res = fwrite($socket_client, "$msg($i)");  
            usleep(100000);  
            echo 'W'; // 打印写的次数  
//          debug(fread($socket_client, 1024)); // 将产生死锁,由于 fread 在阻塞模式下未读到数据时将等待  
        }  
        fwrite($socket_client, "/r/n"); // 传输结束符  
        debug(fread($socket_client, 1024));  
        fclose($socket_client);  
    }  
}  
else {  
      
//  $phArr = array();  
//  for ($i = 0; $i < 10; $i++) {  
//      $phArr[$i] = popen("php ".__FILE__." '{$i}:test'", 'r');  
//  }  
//  foreach ($phArr as $ph) {  
//      pclose($ph);  
//  }  
      
    for ($i = 0; $i < 10; $i++) {  
        system("php ".__FILE__." '{$i}:test'");  
    }  
}

代码分析

首先,解释一下以上的代码逻辑:客户端 socket_client.php 循环发送数据,最后发送结束符;服务端 socket_server.php 使用 accept 阻塞方式接收 socket 链接,而后循环接收数据,直到收到结束符,返回结果数据(接收到的字节数)。虽然逻辑很简单,可是其中有几种状况很值得分析一下:并发

A> 默认状况下,运行 php socket_client.php test,客户端打出 10 个 W,服务端打出若干个 R 后面是接收到的数据,/tmp/socket.log 记录下服务端返回的接收结果数据。这种状况很容易理解,再也不赘述。而后,使用 telnet 命令同时打开多个客户端,你会发现服务器一个时间只处理一个客户端,其余须要在后面“排队”;这就是阻塞 IO 的特色,这种模式的弱点很明显,效率极低。异步

B> 只打开 socket_client.php 第 26 行的注释代码,再次运行 php socket_client.php test 客户端打出一个 W,服务端也打出一个 R,以后两个程序都卡住了。这是为何呢,分析逻辑后你会发现,这是因为客户端在未发送结束符以前就向服务端要返回数据;而服务端因为未收到结束符,也在向客户端要结束符,形成死锁。而之因此只打出一个 W 和 R,是由于 fread 默认是阻塞的。要解决这个死锁,必须打开 socket_client.php 第 16 行的注释代码,给 socket 设置一个 0.1 秒的超时,再次运行你会发现隔 0.1 秒出现一个 W 和 R 以后正常结束,服务端返回的接收结果数据也正常记录了。可见 fread 缺省是阻塞的,咱们在编程的时候要特别注意,若是没有设置超时,就很容易会出现死锁。socket

C> 只打开 15 行注释,运行 php socket_client.php test,结果基本和状况 A 相同,惟一不一样的是 /tmp/socket.log 没有记录下返回数据。这里能够看出客户端运行在阻塞和非阻塞模式的区别,固然在客户端不在意接受结果的状况下,可使用非阻塞模式来得到最大效率。

D> 运行 php socket_client.php 是连续运行 10 次上面的逻辑,这个没什么问题;可是很奇怪的是若是你使用 35 - 41 行的代码,用 popen 同时开启 10 个进程来运行,就会形成服务器端的死循环,十分怪异!后来经调查发现只要是用 popen 打开的进程建立的链接会致使 fread 或者 socket_read 出错直接返回空字串,从而致使死循环,查阅 PHP 源代码后发现 PHP 的 popen 和 fread 函数已经彻底不是 C 原生的了,里面都插入了大量的 php_stream_* 实现逻辑,初步估计是其中的某个 bug 致使的 Socket 链接中断所致使的,解决方法就是打开 socket_server.php 中 31 行的代码,若是链接中断则跳出循环,可是这样一来就会有不少数据丢失了,这个问题须要特别注意!

同步非阻塞IO模型

select_server.php

<?php  
/** 
 * SelectSocketServer Class 
 * By James.Huang <shagoo#gmail.com> 
**/  
set_time_limit(0);  
class SelectSocketServer   
{  
    private static $socket;  
    private static $timeout = 60;  
    private static $maxconns = 1024;  
    private static $connections = array();  
    function SelectSocketServer($port)   
    {  
        global $errno, $errstr;  
        if ($port < 1024) {  
            die("Port must be a number which bigger than 1024/n");  
        }  
          
        $socket = socket_create_listen($port);  
        if (!$socket) die("Listen $port failed");  
          
        socket_set_nonblock($socket); // 非阻塞  
          
        while (true)   
        {  
            $readfds = array_merge(self::$connections, array($socket));  
            $writefds = array();  
              
            // 选择一个链接,获取读、写链接通道  
            if (socket_select($readfds, $writefds, $e = null, $t = self::$timeout))   
            {  
                // 若是是当前服务端的监听链接  
                if (in_array($socket, $readfds)) {  
                    // 接受客户端链接  
                    $newconn = socket_accept($socket);  
                    $i = (int) $newconn;  
                    $reject = '';  
                    if (count(self::$connections) >= self::$maxconns) {  
                        $reject = "Server full, Try again later./n";  
                    }  
                    // 将当前客户端链接放入 socket_select 选择  
                    self::$connections[$i] = $newconn;  
                    // 输入的链接资源缓存容器  
                    $writefds[$i] = $newconn;  
                    // 链接不正常  
                    if ($reject) {  
                        socket_write($writefds[$i], $reject);  
                        unset($writefds[$i]);  
                        self::close($i);  
                    } else {  
                        echo "Client $i come./n";  
                    }  
                    // remove the listening socket from the clients-with-data array  
                    $key = array_search($socket, $readfds);  
                    unset($readfds[$key]);  
                }  
                  
                // 轮循读通道  
                foreach ($readfds as $rfd) {  
                    // 客户端链接  
                    $i = (int) $rfd;  
                    // 从通道读取  
                    $line = @socket_read($rfd, 2048, PHP_NORMAL_READ);  
                    if ($line === false) {  
                        // 读取不到内容,结束链接            
                        echo "Connection closed on socket $i./n";  
                        self::close($i);  
                        continue;  
                    }  
                    $tmp = substr($line, -1);  
                    if ($tmp != "/r" && $tmp != "/n") {  
                        // 等待更多数据  
                        continue;  
                    }  
                    // 处理逻辑  
                    $line = trim($line);  
                    if ($line == "quit") {  
                        echo "Client $i quit./n";  
                        self::close($i);  
                        break;  
                    }  
                    if ($line) {  
                        echo "Client $i >>" . $line . "/n";  
                    }  
                }  
                  
                // 轮循写通道  
                foreach ($writefds as $wfd) {  
                    $i = (int) $wfd;  
                    $w = socket_write($wfd, "Welcome Client $i!/n");  
                }  
            }  
        }  
    }  
      
    function close ($i)   
    {  
        socket_shutdown(self::$connections[$i]);  
        socket_close(self::$connections[$i]);  
        unset(self::$connections[$i]);  
    }  
}  
new SelectSocketServer(2000);

select_client.php

<?php  
/** 
 * SelectSocket Test Client 
 * By James.Huang <shagoo#gmail.com> 
**/  
function debug ($msg)  
{  
//  echo $msg;  
    error_log($msg, 3, '/tmp/socket.log');  
}  
if ($argv[1]) {  
      
    $socket_client = stream_socket_client('tcp://0.0.0.0:2000', $errno, $errstr, 30);  
      
//  stream_set_timeout($socket_client, 0, 100000);  
      
    if (!$socket_client) {  
        die("$errstr ($errno)");  
    } else {  
        $msg = trim($argv[1]);  
        for ($i = 0; $i < 10; $i++) {  
            $res = fwrite($socket_client, "$msg($i)/n");  
            usleep(100000);  
//          debug(fread($socket_client, 1024)); // 将产生死锁,由于 fread 在阻塞模式下未读到数据时将等待  
        }  
        fwrite($socket_client, "quit/n"); // add end token  
        debug(fread($socket_client, 1024));  
        fclose($socket_client);  
    }  
}  
else {  
      
    $phArr = array();  
    for ($i = 0; $i < 10; $i++) {  
        $phArr[$i] = popen("php ".__FILE__." '{$i}:test'", 'r');  
    }  
    foreach ($phArr as $ph) {  
        pclose($ph);  
    }  
      
//  for ($i = 0; $i < 10; $i++) {  
//      system("php ".__FILE__." '{$i}:test'");  
//  }  
}

代码分析

以上代码的逻辑也很简单,select_server.php 实现了一个相似聊天室的功能,你可使用 telnet 工具登陆上去,和其余用户文字聊天,也能够键入“quit”命令离开;而 select_client.php 则模拟了一个登陆用户连续发 10 条信息,而后退出。这里也分析两个问题:

A> 这里若是咱们执行 php select_client.php 程序将会同时打开 10 个链接,同时进行模拟登陆用户操做;观察服务端打印的数据你会发现服务端确实是在同时处理这些链接,这就是多路复用实现的非阻塞 IO 模型,固然这个模型并无真正的实现异步,由于最终服务端程序仍是要去通道里面读取数据,获得结果后同步返回给客户端。若是此次你也使用 telnet 命令同时打开多个客户端,你会发现服务端能够同时处理这些链接,这就是非阻塞 IO,固然比古老的阻塞 IO 效率要高多了,可是这种模式仍是有局限的,继续看下去你就会发现了~

B> 我在 select_server.php 中设置了几个参数,你们能够调整试试:
$timeout :表示的是 select 的超时时间,这个通常来讲不要过短,不然会致使 CPU 负载太高。
$maxconns :表示的是最大链接数,客户端超过这个数的话,服务器会拒绝接收。这里要提到的一点是,因为 select 是经过句柄来读写的,因此会受到系统默认参数 __FD_SETSIZE 的限制,通常默认值为 1024,修改的话须要从新编译内核;另外经过测试发现 select 模式的性能会随着链接数的增大而线性便差(详情见《Socket深度探究4PHP(二)》),这也就是 select 模式最大的问题所在,因此若是是超高并发服务器建议使用下一种模式。

异步非阻塞IO模型

epoll_server.php

<?php  
/** 
 * EpollSocketServer Class (use libevent) 
 * By James.Huang <shagoo#gmail.com> 
 *  
 * Defined constants: 
 *  
 * EV_TIMEOUT (integer) 
 * EV_READ (integer) 
 * EV_WRITE (integer) 
 * EV_SIGNAL (integer) 
 * EV_PERSIST (integer) 
 * EVLOOP_NONBLOCK (integer) 
 * EVLOOP_ONCE (integer) 
**/  
set_time_limit(0);  
class EpollSocketServer  
{  
    private static $socket;  
    private static $connections;  
    private static $buffers;  
      
    function EpollSocketServer ($port)  
    {  
        global $errno, $errstr;  
          
        if (!extension_loaded('libevent')) {  
            die("Please install libevent extension firstly/n");  
        }  
          
        if ($port < 1024) {  
            die("Port must be a number which bigger than 1024/n");  
        }  
          
        $socket_server = stream_socket_server("tcp://0.0.0.0:{$port}", $errno, $errstr);  
        if (!$socket_server) die("$errstr ($errno)");  
          
        stream_set_blocking($socket_server, 0); // 非阻塞  
          
        $base = event_base_new();  
        $event = event_new();  
        event_set($event, $socket_server, EV_READ | EV_PERSIST, array(__CLASS__, 'ev_accept'), $base);  
        event_base_set($event, $base);  
        event_add($event);  
        event_base_loop($base);  
          
        self::$connections = array();  
        self::$buffers = array();  
    }  
      
    function ev_accept($socket, $flag, $base)   
    {  
        static $id = 0;  
      
        $connection = stream_socket_accept($socket);  
        stream_set_blocking($connection, 0);  
      
        $id++; // increase on each accept  
      
        $buffer = event_buffer_new($connection, array(__CLASS__, 'ev_read'), array(__CLASS__, 'ev_write'), array(__CLASS__, 'ev_error'), $id);  
        event_buffer_base_set($buffer, $base);  
        event_buffer_timeout_set($buffer, 30, 30);  
        event_buffer_watermark_set($buffer, EV_READ, 0, 0xffffff);  
        event_buffer_priority_set($buffer, 10);  
        event_buffer_enable($buffer, EV_READ | EV_PERSIST);  
      
        // we need to save both buffer and connection outside  
        self::$connections[$id] = $connection;  
        self::$buffers[$id] = $buffer;  
    }  
      
    function ev_error($buffer, $error, $id)   
    {  
        event_buffer_disable(self::$buffers[$id], EV_READ | EV_WRITE);  
        event_buffer_free(self::$buffers[$id]);  
        fclose(self::$connections[$id]);  
        unset(self::$buffers[$id], self::$connections[$id]);  
    }  
      
    function ev_read($buffer, $id)   
    {  
        static $ct = 0;  
        $ct_last = $ct;  
        $ct_data = '';  
        while ($read = event_buffer_read($buffer, 1024)) {  
            $ct += strlen($read);  
            $ct_data .= $read;  
        }  
        $ct_size = ($ct - $ct_last) * 8;  
        echo "[$id] " . __METHOD__ . " > " . $ct_data . "/n";  
        event_buffer_write($buffer, "Received $ct_size byte data./r/n");  
    }  
      
    function ev_write($buffer, $id)   
    {  
        echo "[$id] " . __METHOD__ . "/n";  
    }  
}  
new EpollSocketServer(2000);

epoll_client.php

<?php  
/** 
 * EpollSocket Test Client 
 * By James.Huang <shagoo#gmail.com> 
**/  
function debug ($msg)  
{  
//  echo $msg;  
    error_log($msg, 3, '/tmp/socket.log');  
}  
if ($argv[1]) {  
    $socket_client = stream_socket_client('tcp://0.0.0.0:2000', $errno, $errstr, 30);  
//  stream_set_blocking($socket_client, 0);  
    if (!$socket_client) {  
        die("$errstr ($errno)");  
    } else {  
        $msg = trim($argv[1]);  
        for ($i = 0; $i < 10; $i++) {  
            $res = fwrite($socket_client, "$msg($i)");  
            usleep(100000);  
            debug(fread($socket_client, 1024));  
        }  
        fclose($socket_client);  
    }  
}  
else {  
      
    $phArr = array();  
    for ($i = 0; $i < 10; $i++) {  
        $phArr[$i] = popen("php ".__FILE__." '{$i}:test'", 'r');  
    }  
    foreach ($phArr as $ph) {  
        pclose($ph);  
    }  
      
//  for ($i = 0; $i < 10; $i++) {  
//      system("php ".__FILE__." '{$i}:test'");  
//  }  
}

代码解析

先说一下,以上的例子是基于 PHP 的 libevent 扩展实现的,须要运行的话要先安装此扩展,参考:http://pecl.php.net/package/l...

这个例子作的事情和前面介绍的第一个模型同样,epoll_server.php 实现的服务端也是接受客户端数据,而后返回结果(接收到的字节数)。可是,当你运行 php epoll_client.php 的时候你会发现服务端打印出来的结果和 accept 阻塞模型就大不同了,固然运行效率也有极大的提高,这是为何呢?接下来就介绍一下 epoll/kqueue 模型:在介绍 select 模式的时候咱们提到了这种模式的局限,而 epoll 就是为了解决 poll 的这两个缺陷而生的。首先,epoll 模式基本没有限制(参考 cat /proc/sys/fs/file-max 默认就达到 300K,很使人兴奋吧,其实这也就是所谓基于 epoll 的 Erlang 服务端能够同时处理这么多并发链接的根本缘由,不过如今 PHP 理论上也能够作到了,呵呵);另外,epoll 模式的性能也不会像 select 模式那样随着链接数的增大而变差,测试发现性能仍是很稳定的(下篇会有详细介绍)。

epoll 工做有两种模式 LT(level triggered) 和 ET(edge-triggered),前者是缺省模式,同时支持阻塞和非阻塞 IO 模式,虽然性能比后者差点,可是比较稳定,通常来讲在实际运用中,咱们都是用这种模式(ET 模式和 WinSock 都是纯异步非阻塞模型)。而另一点要说的是 libevent 是在编译阶段选择系统的 I/O demultiplex 机制的,不支持在运行阶段根据配置再次选择,因此咱们在这里也就不细讨论 libevent 的实现的细节了,若是朋友有兴趣进一步了解的话,请参考:http://monkey.org/~provos/lib...

到这里,第一部分的内容结束了,相信你们已经了解了 Socket 编程的几个重点概念和一些实战技巧,在下一篇《Socket深度探究4PHP(二) 》我将会对 select/poll/epoll/kqueue 几种模式作一下深刻的介绍和对比,另外也会涉及到两种重要的 I/O 多路复用模式:Reactor 和 Proactor 模式。

To be continued ...

相关文章
相关标签/搜索