为了说明swoole进程间是如何使用unix socket进行通讯的,咱们先从源码入手,看看__construct和start函数究竟作了些什么。对于源码,咱们只选取和本问题相关的部分进行解读。php
swoole_process.c static PHP_METHOD(swoole_process, __construct) { /*pipe_type便是__constrct的第三参数$create_pipe, 默认为2*/ long pipe_type = 2; if (pipe_type > 0) { swPipe *_pipe = emalloc(sizeof(swWorker)); int socket_type = pipe_type == 1 ? SOCK_STREAM : ,SOCK_DGRAM; /*建立pipe, 本质是套接字*/ if (swPipeUnsock_create(_pipe, 1, socket_type) < 0) { RETURN_FALSE; } process->pipe_object = _pipe; /*获取主进程用于读写的文件描述符*/ process->pipe_master = _pipe->getFd(_pipe, SW_PIPE_MASTER); /*获取工做进程用于读写的文件描述符*/ process->pipe_worker = _pipe->getFd(_pipe, SW_PIPE_WORKER); process->pipe = process->pipe_master; /*对当前swoole_process对象(php中new出的)的属性值pipe进行赋值*/ zend_update_property_long(swoole_process_class_entry_ptr, getThis(), ZEND_STRL("pipe"), process->pipe_master TSRMLS_CC) }
swPipeUnsock_create作了什么呢?html
pipe/PipeUnsock.c int swPipeUnsock_create(swPipe *p, int blocking, int protocol) { p->blocking = blocking; /*建立本地套接字*/ ret = socketpair(AF_UNIX, protocol, 0, object->socks); if (ret < 0) { swWarn("socketpair() failed. Error: %s [%d]", strerror(errno), errno); return SW_ERR; } else { /* * swoole_config.h:#define SW_SOCKET_BUFFER_SIZE (8*1024*1024) * 由此可知套接字buffer大小为8M */ int sbsize = SwooleG.socket_buffer_size; swSocket_set_buffer_size(object->socks[0], sbsize); swSocket_set_buffer_size(object->socks[1], sbsize); } return 0; }
swoole_process.c static PHP_METHOD(swoole_process, start) { swWorker *process = swoole_get_object(getThis()); /* * 建立进程 * 注意: * 1. 建立后父子进程中各有一个swoole_process对象,它们是双胞胎。 * 2. 子进程继承父进程打开的文件描述符,因此以前__construct时建立的套接子,子进程中也有一份 */ pid_t pid = fork(); if (pid < 0) { swoole_php_fatal_error(E_WARNING, "fork() failed. Error: %s[%d]", strerror(errno), errno); RETURN_FALSE; } /*主进程逻辑*/ else if (pid > 0) { process->pid = pid; process->child_process = 0; zend_update_property_long(swoole_server_class_entry_ptr, getThis(), ZEND_STRL("pid"), process->pid TSRMLS_CC); RETURN_LONG(pid); } /*子进程逻辑*/ else { process->child_process = 1; SW_CHECK_RETURN(php_swoole_process_start(process, getThis() TSRMLS_CC)); } RETURN_TRUE; }
子进程逻辑在php_swoole_process_start()中执行,咱们继续看react
swoole_process.c int php_swoole_process_start(swWorker *process, zval *object TSRMLS_DC) { /* * 设置子进程中的swoole_process用于通讯的描述符 * 对比前文,主进程中的swoole_process使用的是process->pipe_master */ process->pipe = process->pipe_worker; process->pid = getpid(); /*更新子进程中swoole_process(php对象)的相应属性*/ zend_update_property_long(swoole_process_class_entry_ptr, object, ZEND_STRL("pid"), process->pid TSRMLS_CC); zend_update_property_long(swoole_process_class_entry_ptr, object, ZEND_STRL("pipe"), process->pipe_worker TSRMLS_CC); }
可见
__construt的主要工做是使用socketpair建立一对套接字,并指定主进程中的swoole_process对象用于读写的套接字。
start的主要工做是,建立子进程,设置子进程中的swoole_process对象用于读写的套接字。c++
swoole_process.c static PHP_METHOD(swoole_process, read) { /*默认每次读写大小*/ long buf_size = 8192; /*上限值*/ if (buf_size > 65536) { buf_size = 65536; } swWorker *process = swoole_get_object(getThis()); char *buf = emalloc(buf_size + 1); /*从进程中保留的套接字中读取数据*/ int ret = read(process->pipe, buf, buf_size);; /*设置php函数swoole_process->read返回值*/ SW_ZVAL_STRINGL(return_value, buf, ret, 0); }
swoole_process.c static PHP_METHOD(swoole_process, write) { int ret; /*如下两种状况的本质都是调用write函数向process-pipe中写入数据*/ //async write if (SwooleG.main_reactor) { ret = SwooleG.main_reactor->write(SwooleG.main_reactor, process->pipe, data, (size_t) data_len); } else { ret = swSocket_write_blocking(process->pipe, data, data_len); } ZVAL_LONG(return_value, ret); }
swoole进程间使用套接字通讯的原理以下:
1. 父进程使用socketpair建立一对套接字
2. 建立子进程时,子进程继承了这对套接字
3. 父子进程使用系统的read,write函数对各自的套接字进行读写完成通讯。
4. 对于多个子进程,父进程实际上是为每一个子进程建立一对套接字用于通讯。
5. 子进程之间的通讯,好比A向B发消息,本质是fork A进程时,A从父进程处继承了向B发消息的套接字,从而完成了向B的通讯。编程
手册中说:默认的方式是流式。但从1.1节的__construct源码中咱们能够看到,默认使用的是SOCK_DGRAM方式swoole
此参数经__construct的第三参数传入,最终做用于socketpair的protocol字段socket
ret = socketpair(AF_UNIX, protocol, 0, object->socks);
在一般意义来讲SOCK_STREAM与SOCK_DGRAM分别用于tcp通讯和udp通讯,前者有序(先发先至),可靠;后者不保证顺序及数据可靠性。但在本地套接字中,因为是本机两进程通讯,不会涉及数据丢失,乱序等问题。那么这两个参数的区别在哪呢?
下面是我看到的一个很是清晰明了的解释:async
The difference between SOCK_STREAM and SOCK_DGRAM is in the semantics of consuming data out of the socket.tcp
Stream socket allows for reading arbitrary number of bytes, but still preserving byte sequence. In other words, a sender might write 4K of data to the socket, and the receiver can consume that data byte by byte. The other way around is true too - sender can write several small messages to the socket that the receiver can consume in one read. Stream socket does not preserve message boundaries.函数
Datagram socket, on the other hand, does preserve these boundaries - one write by the sender always corresponds to one read by the receiver (even if receiver’s buffer given to read(2) or recv(2) is smaller then that message).
也就是说SOCK_STREAM是流式的,数据没有消息边界,发送方屡次写入的数据,可能读取方一次就读取了。发送一次写入的数据,读取方可能分屡次才读完。回到swoole意味着这种方式下,write与read的次数并非一一对应的。你须要本身设置边界来切分消息。
SOCK_DGRAM方式,数据自然是有边界的,读写次数必定是一一对应的。回到swoole,意味着这种方式下,只要你的单条消息不超单次读写上限(默认8192字节),就不须要自行设置边界来切分消息。
看一个例子
<?php $process1 = new swoole_process(function($process){ $i = 1; while (true) { $msg = $process->read(); echo $msg,"\n"; echo "read $i time\n"; $i++; } }, false, 1); $num = 10; $process2 = new swoole_process(function($process) use ($process1, $num){ for ($i=0; $i<$num; $i++){ $msg = $process1->write("hello 1 i'm 2;"); if ($i % 5 == 0){ sleep(1); } } }); $process2->start(); $process1->start();
new第三参数设为1,使用SOCK_STREAM通讯。运行结果以下:
hello 1 i'm 2; read 1 time hello 1 i'm 2;hello 1 i'm 2;hello 1 i'm 2; read 2 time hello 1 i'm 2;hello 1 i'm 2; read 3 time hello 1 i'm 2;hello 1 i'm 2; read 4 time hello 1 i'm 2;hello 1 i'm 2; read 5 time
process2向process1写了10次数据,process1用了5次读完
将new第三参数设为2,使用SOCK_DGRAM通讯。运行结果以下:
hello 1 i'm 2; read 1 time hello 1 i'm 2; read 2 time hello 1 i'm 2; read 3 time hello 1 i'm 2; read 4 time hello 1 i'm 2; read 5 time hello 1 i'm 2; read 6 time hello 1 i'm 2; read 7 time hello 1 i'm 2; read 8 time hello 1 i'm 2; read 9 time hello 1 i'm 2; read 10 time
10次写对应10次读