在Swoole4.5
版本中(目前还未发布),咱们的Server
有一个性能须要优化的地方,就是worker
进程在收到master
进程发来的包的时候,须要进行两次的拷贝,才能够把数据从PHP
扩展层传递到PHP
上层(也就是咱们事件回调函数的data
参数)。react
咱们先来分析一下为何会有性能的问题。首先,咱们须要一份会有性能问题的代码。咱们git clone
下swoole-src
代码,而后git checkout
到8235c82fea2130534a16fd20771dcab3408a763e
这个commit
位置:git
git checkout 8235c82fea2130534a16fd20771dcab3408a763e
咱们来分析一下代码,首先看master
进程是如何封装数据而后发送给worker
进程的。在函数process_send_packet
里面,咱们看核心的地方:shell
static int process_send_packet(swServer *serv, swPipeBuffer *buf, swSendData *resp, send_func_t _send, void* private_data) { const char* data = resp->data; uint32_t send_n = resp->info.len; off_t offset = 0; uint32_t max_length = serv->ipc_max_size - sizeof(buf->info); if (send_n <= max_length) { buf->info.flags = 0; buf->info.len = send_n; memcpy(buf->data, data, send_n); int retval = _send(serv, buf, sizeof(buf->info) + send_n, private_data); return retval; } buf->info.flags = SW_EVENT_DATA_CHUNK; while (send_n > 0) { if (send_n > max_length) { buf->info.len = max_length; } else { buf->info.flags |= SW_EVENT_DATA_END; buf->info.len = send_n; } memcpy(buf->data, data + offset, buf->info.len); if (_send(serv, buf, sizeof(buf->info) + buf->info.len, private_data) < 0) { return SW_ERR; } send_n -= buf->info.len; offset += buf->info.len; } return SW_OK; }
首先,咱们来讲一下process_send_packet
这个函数的参数:服务器
其中,swoole
swServer *serv
就是咱们建立的那个Server
。app
swPipeBuffer *buf
指向的内存里面的数据须要发送给worker
进程。函数
swSendData *resp
里面存放了master
进程收到的客户端数据以及一个swDataHead info
头部。性能
_send
是一个回调函数,这里面的逻辑就是master
进程把swPipeBuffer *buf
里面的数据发送给worker
进程。fetch
void* private_data
这里是一个swWorker *worker
类型的指针转换过来的。指定了master
进程须要发送的那个worker
进程。大数据
说明一点,这里咱们是以Server
设置了eof
选项为例子讲解的(假设设置了"\r\n"
)。由于TCP
是面向字节流的,即便客户端发送了一个很大的包过来,服务器一次read
出来的数据也不见得很是大。若是不设置eof
的话,是不会致使咱们这篇文章所说的性能问题。
介绍完了process_send_packet
函数的参数以后,咱们来看看代码是如何实现的:
const char* data = resp->data;
首先,让data
指向resp->data
,也就是客户端发来的实际数据。例如,客户端发来了字符串hello world\r\n
,那么data
里面存放的就是hello world\r\n
。
uint32_t send_n = resp->info.len;
标志着resp->data
数据的长度。例如,客户端往服务器发送了1M
的数据,那么resp->info.len
就是1048576
。
off_t offset = 0;
用来标志哪些数据master
进程已经发送给了worker
进程。
uint32_t max_length = serv->ipc_max_size - sizeof(buf->info);
max_length
表示master
进程一次往worker
进程发送的包最大长度。
注意:master
进程和worker
进程是经过udg
方式进行通讯的。因此,master
进程发送多少,worker
进程就直接收多少
if (send_n <= max_length) { buf->info.flags = 0; buf->info.len = send_n; memcpy(buf->data, data, send_n); int retval = _send(serv, buf, sizeof(buf->info) + send_n, private_data); return retval; }
若是master
进程要发给worker
进程的数据小于max_length
,那么就直接调用_send
函数,直接把数据发给worker
进程。
buf->info.flags = SW_EVENT_DATA_CHUNK;
当send_n
大于max_length
的时候,设置buf->info.flags
为CHUNK
,也就意味着须要把客户端发来的数据先拆分红一小段一小段的数据,而后再发送给worker
进程。
while (send_n > 0) { if (send_n > max_length) { buf->info.len = max_length; } else { buf->info.flags |= SW_EVENT_DATA_END; buf->info.len = send_n; } memcpy(buf->data, data + offset, buf->info.len); if (_send(serv, buf, sizeof(buf->info) + buf->info.len, private_data) < 0) { return SW_ERR; } send_n -= buf->info.len; offset += buf->info.len; }
逻辑比较简单,就是一个分段发送的过程。这里须要注意的两点:
一、buf->info.len的长度须要更新为小段的chunk的长度,而不是大数据包的长度 二、最后一个chunk的info.flags须要变成SW_EVENT_DATA_END,意味着一个完整的包已经发完了
OK
,分析完了master
进程发包的过程,咱们来分析一下worker
进程收包的过程。
咱们先看一下函数swWorker_onPipeReceive
:
static int swWorker_onPipeReceive(swReactor *reactor, swEvent *event) { swServer *serv = (swServer *) reactor->ptr; swFactory *factory = &serv->factory; swPipeBuffer *buffer = serv->pipe_buffers[0]; int ret; _read_from_pipe: if (read(event->fd, buffer, serv->ipc_max_size) > 0) { ret = swWorker_onTask(factory, (swEventData *) buffer); if (buffer->info.flags & SW_EVENT_DATA_CHUNK) { //no data if (ret < 0 && errno == EAGAIN) { return SW_OK; } else if (ret > 0) { goto _read_from_pipe; } } return ret; } return SW_ERR; }
这个就是worker
进程接收master
进程发来的数据的代码。
咱们看的,worker
进程会直接把数据先读取到buffer
内存里面,而后调用swWorker_onTask
。咱们再来看看swWorker_onTask
函数:
int swWorker_onTask(swFactory *factory, swEventData *task) { swServer *serv = (swServer *) factory->ptr; swWorker *worker = SwooleWG.worker; //worker busy worker->status = SW_WORKER_BUSY; //packet chunk if (task->info.flags & SW_EVENT_DATA_CHUNK) { if (serv->merge_chunk(serv, task->info.reactor_id, task->data, task->info.len) < 0) { swoole_error_log(SW_LOG_WARNING, SW_ERROR_SESSION_DISCARD_DATA, "cannot merge chunk to worker buffer, data[fd=%d, size=%d] lost", task->info.fd, task->info.len); return SW_OK; } //wait more data if (!(task->info.flags & SW_EVENT_DATA_END)) { return SW_OK; } } switch (task->info.type) { case SW_SERVER_EVENT_SEND_DATA: //discard data if (swWorker_discard_data(serv, task) == SW_TRUE) { break; } swWorker_do_task(serv, worker, task, serv->onReceive); break; // 省略其余的case default: swWarn("[Worker] error event[type=%d]", (int )task->info.type); break; } //worker idle worker->status = SW_WORKER_IDLE; //maximum number of requests, process will exit. if (!SwooleWG.run_always && worker->request_count >= SwooleWG.max_request) { swWorker_stop(worker); } return SW_OK; }
咱们重点看看性能问题代码:
if (task->info.flags & SW_EVENT_DATA_CHUNK) { if (serv->merge_chunk(serv, task->info.reactor_id, task->data, task->info.len) < 0) { swoole_error_log(SW_LOG_WARNING, SW_ERROR_SESSION_DISCARD_DATA, "cannot merge chunk to worker buffer, data[fd=%d, size=%d] lost", task->info.fd, task->info.len); return SW_OK; } //wait more data if (!(task->info.flags & SW_EVENT_DATA_END)) { return SW_OK; } }
这里,worker
进程会先判断master
发来的数据是不是CHUNK
数据,若是是,那么会进行merge_chunk
的操做。咱们看看merge_chunk
对应的函数:
static int swServer_worker_merge_chunk(swServer *serv, int key, const char *data, size_t len) { swString *package = swServer_worker_get_input_buffer(serv, key); //merge data to package buffer return swString_append_ptr(package, data, len); }
咱们会先根据key
的值(其实是reactor
线程的id
),获取一块全局的内存,而后把接收到的chunk
数据,追加到这个全局的内存上面,而swString_append_ptr
执行的就是memcpy
的操做。
因此,这就是一个性能问题了。worker
进程接收到的全部数据都会被完整的拷贝一遍。若是客户端发来的数据很大,这个拷贝的开销也是很大声的。
所以,咱们对这部分合并的代码进行了一个优化。咱们让worker
进程在接收master
进程的数据以前,就准备好一块足够大的内存,而后直接把master
进程发来的数据下来便可。
咱们先更新一下swoole-src
的源码:
git checkout 529ad44d578930b3607abedcfc278364df34bc73
咱们依旧先看看process_send_packet
函数的代码:
static int process_send_packet(swServer *serv, swPipeBuffer *buf, swSendData *resp, send_func_t _send, void* private_data) { const char* data = resp->data; uint32_t send_n = resp->info.len; off_t offset = 0; uint32_t copy_n; uint32_t max_length = serv->ipc_max_size - sizeof(buf->info); if (send_n <= max_length) { buf->info.flags = 0; buf->info.len = send_n; memcpy(buf->data, data, send_n); int retval = _send(serv, buf, sizeof(buf->info) + send_n, private_data); return retval; } buf->info.flags = SW_EVENT_DATA_CHUNK; buf->info.len = send_n; while (send_n > 0) { if (send_n > max_length) { copy_n = max_length; } else { buf->info.flags |= SW_EVENT_DATA_END; copy_n = send_n; } memcpy(buf->data, data + offset, copy_n); swTrace("finish, type=%d|len=%d", buf->info.type, copy_n); if (_send(serv, buf, sizeof(buf->info) + copy_n, private_data) < 0) { return SW_ERR; } send_n -= copy_n; offset += copy_n; } return SW_OK; }
咱们聚焦修改的地方,主要是对CHUNK
的处理:
buf->info.flags = SW_EVENT_DATA_CHUNK; buf->info.len = send_n;
咱们发现,buf->info.len
的长度不是每一个小段chunk
的长度了,而是整个大包的长度了。为何能够这样作呢?由于master
进程与worker
进程是经过udg
进行通讯的,因此,worker
进程在调用recv
的时候,返回值实际上就是chunk
的长度了,因此buf->info.len
里面存储chunk
的长度没有必要。
其余地方的逻辑和以前的代码没有区别。
咱们再来看看worker
进程是如何接收master
进程发来的数据的。在函数swWorker_onPipeReceive
里面:
static int swWorker_onPipeReceive(swReactor *reactor, swEvent *event) { int ret; ssize_t recv_n = 0; swServer *serv = (swServer *) reactor->ptr; swFactory *factory = &serv->factory; swPipeBuffer *pipe_buffer = serv->pipe_buffers[0]; void *buffer; struct iovec buffers[2]; // peek recv_n = recv(event->fd, &pipe_buffer->info, sizeof(pipe_buffer->info), MSG_PEEK); if (recv_n < 0 && errno == EAGAIN) { return SW_OK; } else if (recv_n < 0) { return SW_ERR; } if (pipe_buffer->info.flags & SW_EVENT_DATA_CHUNK) { buffer = serv->get_buffer(serv, &pipe_buffer->info); _read_from_pipe: buffers[0].iov_base = &pipe_buffer->info; buffers[0].iov_len = sizeof(pipe_buffer->info); buffers[1].iov_base = buffer; buffers[1].iov_len = serv->ipc_max_size - sizeof(pipe_buffer->info); recv_n = readv(event->fd, buffers, 2); if (recv_n < 0 && errno == EAGAIN) { return SW_OK; } if (recv_n > 0) { serv->add_buffer_len(serv, &pipe_buffer->info, recv_n - sizeof(pipe_buffer->info)); } if (pipe_buffer->info.flags & SW_EVENT_DATA_CHUNK) { //wait more chunk data if (!(pipe_buffer->info.flags & SW_EVENT_DATA_END)) { goto _read_from_pipe; } else { pipe_buffer->info.flags |= SW_EVENT_DATA_OBJ_PTR; /** * Because we don't want to split the swEventData parameters into swDataHead and data, * we store the value of the worker_buffer pointer in swEventData.data. * The value of this pointer will be fetched in the swServer_worker_get_packet function. */ serv->copy_buffer_addr(serv, pipe_buffer); } } } else { recv_n = read(event->fd, pipe_buffer, serv->ipc_max_size); } if (recv_n > 0) { ret = swWorker_onTask(factory, (swEventData *) pipe_buffer, recv_n - sizeof(pipe_buffer->info)); return ret; } return SW_ERR; }
其中,
recv_n = recv(event->fd, &pipe_buffer->info, sizeof(pipe_buffer->info), MSG_PEEK); if (recv_n < 0 && errno == EAGAIN) { return SW_OK; } else if (recv_n < 0) { return SW_ERR; }
咱们先对内核缓冲区里面的数据进行一次peek
操做,来获取到head
部分。这样咱们就知道数据是不是以CHUNK
方式发来的了。
if (pipe_buffer->info.flags & SW_EVENT_DATA_CHUNK) { buffer = serv->get_buffer(serv, &pipe_buffer->info); _read_from_pipe: buffers[0].iov_base = &pipe_buffer->info; buffers[0].iov_len = sizeof(pipe_buffer->info); buffers[1].iov_base = buffer; buffers[1].iov_len = serv->ipc_max_size - sizeof(pipe_buffer->info); recv_n = readv(event->fd, buffers, 2); if (recv_n < 0 && errno == EAGAIN) { return SW_OK; } if (recv_n > 0) { serv->add_buffer_len(serv, &pipe_buffer->info, recv_n - sizeof(pipe_buffer->info)); } if (pipe_buffer->info.flags & SW_EVENT_DATA_CHUNK) { //wait more chunk data if (!(pipe_buffer->info.flags & SW_EVENT_DATA_END)) { goto _read_from_pipe; } else { pipe_buffer->info.flags |= SW_EVENT_DATA_OBJ_PTR; /** * Because we don't want to split the swEventData parameters into swDataHead and data, * we store the value of the worker_buffer pointer in swEventData.data. * The value of this pointer will be fetched in the swServer_worker_get_packet function. */ serv->copy_buffer_addr(serv, pipe_buffer); } } }
若是是CHUNK
方式发来的数据,那么咱们执行以下的操做:
buffer = serv->get_buffer(serv, &pipe_buffer->info);
get_buffer
是一个回调函数,对应:
static void* swServer_worker_get_buffer(swServer *serv, swDataHead *info) { swString *worker_buffer = swServer_worker_get_input_buffer(serv, info->reactor_id); if (worker_buffer->size < info->len) { swString_extend(worker_buffer, info->len); } return worker_buffer->str + worker_buffer->length; }
这里,咱们会先判断这块全局的buffer
是否足够的大,能够接收完整个大包,若是不够大,咱们扩容到足够的大。
_read_from_pipe: buffers[0].iov_base = &pipe_buffer->info; buffers[0].iov_len = sizeof(pipe_buffer->info); buffers[1].iov_base = buffer; buffers[1].iov_len = serv->ipc_max_size - sizeof(pipe_buffer->info); recv_n = readv(event->fd, buffers, 2);
而后,咱们调用readv
,把head
和实际的数据分别存在了两个地方。这么作是避免为了把head
和实际的数据作拆分而致使的内存拷贝。
经过以上方式,Swoole Server
减小了一次内存拷贝。