从源码说swoole进程间通讯原理

  1. 本文件假设你有c++和多进程编程的基础知识。
  2. swoole进程间通讯可使用套接字(swoole_process::write/ swoole_process::read),也可使用消息队列(push/pop)。本文的只讲述套接字通讯部分。
  3. 本文使用的swoole源码为1.9版本

1. swoole_process中的__construct和start究竟作了什么

为了说明swoole进程间是如何使用unix socket进行通讯的,咱们先从源码入手,看看__construct和start函数究竟作了些什么。对于源码,咱们只选取和本问题相关的部分进行解读。php

1.1 __construct

swoole_process.c 
static PHP_METHOD(swoole_process, __construct)
{
    /*pipe_type便是__constrct的第三参数$create_pipe, 默认为2*/
    long pipe_type = 2;

    if (pipe_type > 0)
    {
        swPipe *_pipe = emalloc(sizeof(swWorker));
        int socket_type = pipe_type == 1 ? SOCK_STREAM : ,SOCK_DGRAM;
        /*建立pipe, 本质是套接字*/
        if (swPipeUnsock_create(_pipe, 1, socket_type) < 0)
        {
            RETURN_FALSE;
        }

        process->pipe_object = _pipe;
        /*获取主进程用于读写的文件描述符*/
        process->pipe_master = _pipe->getFd(_pipe, SW_PIPE_MASTER);
        /*获取工做进程用于读写的文件描述符*/
        process->pipe_worker = _pipe->getFd(_pipe, SW_PIPE_WORKER); 
        process->pipe = process->pipe_master;
        /*对当前swoole_process对象(php中new出的)的属性值pipe进行赋值*/
        zend_update_property_long(swoole_process_class_entry_ptr, getThis(), ZEND_STRL("pipe"), process->pipe_master TSRMLS_CC)
}

swPipeUnsock_create作了什么呢?html

pipe/PipeUnsock.c 
int swPipeUnsock_create(swPipe *p, int blocking, int protocol)
{

    p->blocking = blocking;
    /*建立本地套接字*/
    ret = socketpair(AF_UNIX, protocol, 0, object->socks);
    if (ret < 0)
    {
        swWarn("socketpair() failed. Error: %s [%d]", strerror(errno), errno);
        return SW_ERR;
    }
    else
    {
        /*
         * swoole_config.h:#define SW_SOCKET_BUFFER_SIZE      (8*1024*1024)
         * 由此可知套接字buffer大小为8M
         */
        int sbsize = SwooleG.socket_buffer_size;
        swSocket_set_buffer_size(object->socks[0], sbsize);
        swSocket_set_buffer_size(object->socks[1], sbsize);
    }

    return 0;
}

1.2 start

swoole_process.c
static PHP_METHOD(swoole_process, start)
{
    swWorker *process = swoole_get_object(getThis());

    /*
     * 建立进程
     * 注意:
     * 1. 建立后父子进程中各有一个swoole_process对象,它们是双胞胎。
     * 2. 子进程继承父进程打开的文件描述符,因此以前__construct时建立的套接子,子进程中也有一份
     */

    pid_t pid = fork();
    if (pid < 0)
    {
        swoole_php_fatal_error(E_WARNING, "fork() failed. Error: %s[%d]", strerror(errno), errno);
        RETURN_FALSE;
    }
    /*主进程逻辑*/
    else if (pid > 0)
    {
        process->pid = pid;
        process->child_process = 0;
        zend_update_property_long(swoole_server_class_entry_ptr, getThis(), ZEND_STRL("pid"), process->pid TSRMLS_CC);
        RETURN_LONG(pid);
    }
    /*子进程逻辑*/
    else
    {
        process->child_process = 1;
        SW_CHECK_RETURN(php_swoole_process_start(process, getThis() TSRMLS_CC));
    }
    RETURN_TRUE;
}

子进程逻辑在php_swoole_process_start()中执行,咱们继续看react

swoole_process.c
int php_swoole_process_start(swWorker *process, zval *object TSRMLS_DC)
{
    /*
     * 设置子进程中的swoole_process用于通讯的描述符
     * 对比前文,主进程中的swoole_process使用的是process->pipe_master
     */
    process->pipe = process->pipe_worker;
    process->pid = getpid();

    /*更新子进程中swoole_process(php对象)的相应属性*/
    zend_update_property_long(swoole_process_class_entry_ptr, object, ZEND_STRL("pid"), process->pid TSRMLS_CC);
    zend_update_property_long(swoole_process_class_entry_ptr, object, ZEND_STRL("pipe"), process->pipe_worker TSRMLS_CC);
}

可见 
__construt的主要工做是使用socketpair建立一对套接字,并指定主进程中的swoole_process对象用于读写的套接字。 
start的主要工做是,建立子进程,设置子进程中的swoole_process对象用于读写的套接字。c++

2. 通讯原理

2.1 read/write源码解读

swoole_process.c
static PHP_METHOD(swoole_process, read)
{
    /*默认每次读写大小*/
    long buf_size = 8192;

    /*上限值*/
    if (buf_size > 65536)
    {
        buf_size = 65536;
    }

    swWorker *process = swoole_get_object(getThis());

    char *buf = emalloc(buf_size + 1);
    /*从进程中保留的套接字中读取数据*/
    int ret = read(process->pipe, buf, buf_size);;

    /*设置php函数swoole_process->read返回值*/
    SW_ZVAL_STRINGL(return_value, buf, ret, 0);
}
swoole_process.c
static PHP_METHOD(swoole_process, write)
{
    int ret;

    /*如下两种状况的本质都是调用write函数向process-pipe中写入数据*/
    //async write
    if (SwooleG.main_reactor)
    {
        ret = SwooleG.main_reactor->write(SwooleG.main_reactor, process->pipe, data, (size_t) data_len);
    }
    else
    {
        ret = swSocket_write_blocking(process->pipe, data, data_len);
    }

    ZVAL_LONG(return_value, ret);
}

2.2 通讯原理总结

swoole进程间使用套接字通讯的原理以下: 
1. 父进程使用socketpair建立一对套接字 
2. 建立子进程时,子进程继承了这对套接字 
3. 父子进程使用系统的read,write函数对各自的套接字进行读写完成通讯。 
4. 对于多个子进程,父进程实际上是为每一个子进程建立一对套接字用于通讯。 
5. 子进程之间的通讯,好比A向B发消息,本质是fork A进程时,A从父进程处继承了向B发消息的套接字,从而完成了向B的通讯。编程

3.关于SOCK_STREAM与SOCK_DGRAM

3.1 手册中的一个错误

手册中说:默认的方式是流式。但从1.1节的__construct源码中咱们能够看到,默认使用的是SOCK_DGRAM方式swoole

3.2 SOCK_STREAM与SOCK_DGRAM的区别

此参数经__construct的第三参数传入,最终做用于socketpair的protocol字段socket

ret = socketpair(AF_UNIX, protocol, 0, object->socks);

在一般意义来讲SOCK_STREAM与SOCK_DGRAM分别用于tcp通讯和udp通讯,前者有序(先发先至),可靠;后者不保证顺序及数据可靠性。但在本地套接字中,因为是本机两进程通讯,不会涉及数据丢失,乱序等问题。那么这两个参数的区别在哪呢? 
下面是我看到的一个很是清晰明了的解释:async

The difference between SOCK_STREAM and SOCK_DGRAM is in the semantics of consuming data out of the socket.tcp

Stream socket allows for reading arbitrary number of bytes, but still preserving byte sequence. In other words, a sender might write 4K of data to the socket, and the receiver can consume that data byte by byte. The other way around is true too - sender can write several small messages to the socket that the receiver can consume in one read. Stream socket does not preserve message boundaries.函数

Datagram socket, on the other hand, does preserve these boundaries - one write by the sender always corresponds to one read by the receiver (even if receiver’s buffer given to read(2) or recv(2) is smaller then that message).

也就是说SOCK_STREAM是流式的,数据没有消息边界,发送方屡次写入的数据,可能读取方一次就读取了。发送一次写入的数据,读取方可能分屡次才读完。回到swoole意味着这种方式下,write与read的次数并非一一对应的。你须要本身设置边界来切分消息。

SOCK_DGRAM方式,数据自然是有边界的,读写次数必定是一一对应的。回到swoole,意味着这种方式下,只要你的单条消息不超单次读写上限(默认8192字节),就不须要自行设置边界来切分消息。

看一个例子

<?php                                                                                 
$process1 = new swoole_process(function($process){                                    
    $i = 1;                                                                           
    while (true) {                                                                    
        $msg = $process->read();                                                      
        echo $msg,"\n";                                                               
        echo "read $i time\n";                                                        
        $i++;                                                                         
    }                                                                                 
}, false, 1);                                                                         

$num = 10;                                                                            

$process2 = new swoole_process(function($process) use ($process1, $num){              
    for ($i=0; $i<$num; $i++){                                                        
        $msg = $process1->write("hello 1 i'm 2;");                                    
        if ($i % 5 == 0){                                                             
            sleep(1);                                                                 
        }                                                                             
    }                                                                                 
});

$process2->start();                                                                   
$process1->start();

new第三参数设为1,使用SOCK_STREAM通讯。运行结果以下:

hello 1 i'm 2;
read 1 time
hello 1 i'm 2;hello 1 i'm 2;hello 1 i'm 2;
read 2 time
hello 1 i'm 2;hello 1 i'm 2;
read 3 time
hello 1 i'm 2;hello 1 i'm 2;
read 4 time
hello 1 i'm 2;hello 1 i'm 2;
read 5 time

process2向process1写了10次数据,process1用了5次读完 
将new第三参数设为2,使用SOCK_DGRAM通讯。运行结果以下:

hello 1 i'm 2;
read 1 time
hello 1 i'm 2;
read 2 time
hello 1 i'm 2;
read 3 time
hello 1 i'm 2;
read 4 time
hello 1 i'm 2;
read 5 time
hello 1 i'm 2;
read 6 time
hello 1 i'm 2;
read 7 time
hello 1 i'm 2;
read 8 time
hello 1 i'm 2;
read 9 time
hello 1 i'm 2;
read 10 time

10次写对应10次读

相关文章
相关标签/搜索