Swoole Server中master进程投递数据到worker进程的性能优化

Swoole4.5版本中(目前还未发布),咱们的Server有一个性能须要优化的地方,就是worker进程在收到master进程发来的包的时候,须要进行两次的拷贝,才能够把数据从PHP扩展层传递到PHP上层(也就是咱们事件回调函数的data参数)。react

咱们先来分析一下为何会有性能的问题。首先,咱们须要一份会有性能问题的代码。咱们git cloneswoole-src代码,而后git checkout8235c82fea2130534a16fd20771dcab3408a763e这个commit位置:git

git checkout 8235c82fea2130534a16fd20771dcab3408a763e

咱们来分析一下代码,首先看master进程是如何封装数据而后发送给worker进程的。在函数process_send_packet里面,咱们看核心的地方:shell

static int process_send_packet(swServer *serv, swPipeBuffer *buf, swSendData *resp, send_func_t _send, void* private_data)
{
    const char* data = resp->data;
    uint32_t send_n = resp->info.len;
    off_t offset = 0;

    uint32_t max_length = serv->ipc_max_size - sizeof(buf->info);

    if (send_n <= max_length)
    {
        buf->info.flags = 0;
        buf->info.len = send_n;
        memcpy(buf->data, data, send_n);

        int retval = _send(serv, buf, sizeof(buf->info) + send_n, private_data);
        return retval;
    }

    buf->info.flags = SW_EVENT_DATA_CHUNK;

    while (send_n > 0)
    {
        if (send_n > max_length)
        {
            buf->info.len = max_length;
        }
        else
        {
            buf->info.flags |= SW_EVENT_DATA_END;
            buf->info.len = send_n;
        }

        memcpy(buf->data, data + offset, buf->info.len);

        if (_send(serv, buf, sizeof(buf->info) + buf->info.len, private_data) < 0)
        {
            return SW_ERR;
        }

        send_n -= buf->info.len;
        offset += buf->info.len;
    }

    return SW_OK;
}

首先,咱们来讲一下process_send_packet这个函数的参数:服务器

其中,swoole

swServer *serv就是咱们建立的那个Serverapp

swPipeBuffer *buf指向的内存里面的数据须要发送给worker进程。函数

swSendData *resp里面存放了master进程收到的客户端数据以及一个swDataHead info头部。性能

_send是一个回调函数,这里面的逻辑就是master进程把swPipeBuffer *buf里面的数据发送给worker进程。fetch

void* private_data这里是一个swWorker *worker类型的指针转换过来的。指定了master进程须要发送的那个worker进程。大数据

说明一点,这里咱们是以 Server设置了 eof选项为例子讲解的(假设设置了 "\r\n")。由于 TCP是面向字节流的,即便客户端发送了一个很大的包过来,服务器一次 read出来的数据也不见得很是大。若是不设置 eof的话,是不会致使咱们这篇文章所说的性能问题。

介绍完了process_send_packet函数的参数以后,咱们来看看代码是如何实现的:

const char* data = resp->data;

首先,让data指向resp->data,也就是客户端发来的实际数据。例如,客户端发来了字符串hello world\r\n,那么data里面存放的就是hello world\r\n

uint32_t send_n = resp->info.len;

标志着resp->data数据的长度。例如,客户端往服务器发送了1M的数据,那么resp->info.len就是1048576

off_t offset = 0;

用来标志哪些数据master进程已经发送给了worker进程。

uint32_t max_length = serv->ipc_max_size - sizeof(buf->info);

max_length表示master进程一次往worker进程发送的包最大长度。

注意: master进程和 worker进程是经过 udg方式进行通讯的。因此, master进程发送多少, worker进程就直接收多少
if (send_n <= max_length)
{
    buf->info.flags = 0;
    buf->info.len = send_n;
    memcpy(buf->data, data, send_n);

    int retval = _send(serv, buf, sizeof(buf->info) + send_n, private_data);
    return retval;
}

若是master进程要发给worker进程的数据小于max_length,那么就直接调用_send函数,直接把数据发给worker进程。

buf->info.flags = SW_EVENT_DATA_CHUNK;

send_n大于max_length的时候,设置buf->info.flagsCHUNK,也就意味着须要把客户端发来的数据先拆分红一小段一小段的数据,而后再发送给worker进程。

while (send_n > 0)
{
    if (send_n > max_length)
    {
        buf->info.len = max_length;
    }
    else
    {
        buf->info.flags |= SW_EVENT_DATA_END;
        buf->info.len = send_n;
    }

    memcpy(buf->data, data + offset, buf->info.len);

    if (_send(serv, buf, sizeof(buf->info) + buf->info.len, private_data) < 0)
    {
        return SW_ERR;
    }

    send_n -= buf->info.len;
    offset += buf->info.len;
}

逻辑比较简单,就是一个分段发送的过程。这里须要注意的两点:

一、buf->info.len的长度须要更新为小段的chunk的长度,而不是大数据包的长度
二、最后一个chunk的info.flags须要变成SW_EVENT_DATA_END,意味着一个完整的包已经发完了

OK,分析完了master进程发包的过程,咱们来分析一下worker进程收包的过程。

咱们先看一下函数swWorker_onPipeReceive

static int swWorker_onPipeReceive(swReactor *reactor, swEvent *event)
{
    swServer *serv = (swServer *) reactor->ptr;
    swFactory *factory = &serv->factory;
    swPipeBuffer *buffer = serv->pipe_buffers[0];
    int ret;

    _read_from_pipe:

    if (read(event->fd, buffer, serv->ipc_max_size) > 0)
    {
        ret = swWorker_onTask(factory, (swEventData *) buffer);
        if (buffer->info.flags & SW_EVENT_DATA_CHUNK)
        {
            //no data
            if (ret < 0 && errno == EAGAIN)
            {
                return SW_OK;
            }
            else if (ret > 0)
            {
                goto _read_from_pipe;
            }
        }
        return ret;
    }

    return SW_ERR;
}

这个就是worker进程接收master进程发来的数据的代码。

咱们看的,worker进程会直接把数据先读取到buffer内存里面,而后调用swWorker_onTask。咱们再来看看swWorker_onTask函数:

int swWorker_onTask(swFactory *factory, swEventData *task)
{
    swServer *serv = (swServer *) factory->ptr;
    swWorker *worker = SwooleWG.worker;

    //worker busy
    worker->status = SW_WORKER_BUSY;
    //packet chunk
    if (task->info.flags & SW_EVENT_DATA_CHUNK)
    {
        if (serv->merge_chunk(serv, task->info.reactor_id, task->data, task->info.len) < 0)
        {
            swoole_error_log(SW_LOG_WARNING, SW_ERROR_SESSION_DISCARD_DATA,
                    "cannot merge chunk to worker buffer, data[fd=%d, size=%d] lost", task->info.fd, task->info.len);
            return SW_OK;
        }
        //wait more data
        if (!(task->info.flags & SW_EVENT_DATA_END))
        {
            return SW_OK;
        }
    }

    switch (task->info.type)
    {
    case SW_SERVER_EVENT_SEND_DATA:
        //discard data
        if (swWorker_discard_data(serv, task) == SW_TRUE)
        {
            break;
        }
        swWorker_do_task(serv, worker, task, serv->onReceive);
        break;
    // 省略其余的case
    default:
        swWarn("[Worker] error event[type=%d]", (int )task->info.type);
        break;
    }

    //worker idle
    worker->status = SW_WORKER_IDLE;

    //maximum number of requests, process will exit.
    if (!SwooleWG.run_always && worker->request_count >= SwooleWG.max_request)
    {
        swWorker_stop(worker);
    }
    return SW_OK;
}

咱们重点看看性能问题代码:

if (task->info.flags & SW_EVENT_DATA_CHUNK)
{
    if (serv->merge_chunk(serv, task->info.reactor_id, task->data, task->info.len) < 0)
    {
        swoole_error_log(SW_LOG_WARNING, SW_ERROR_SESSION_DISCARD_DATA,
                "cannot merge chunk to worker buffer, data[fd=%d, size=%d] lost", task->info.fd, task->info.len);
        return SW_OK;
    }
    //wait more data
    if (!(task->info.flags & SW_EVENT_DATA_END))
    {
        return SW_OK;
    }
}

这里,worker进程会先判断master发来的数据是不是CHUNK数据,若是是,那么会进行merge_chunk的操做。咱们看看merge_chunk对应的函数:

static int swServer_worker_merge_chunk(swServer *serv, int key, const char *data, size_t len)
{
    swString *package = swServer_worker_get_input_buffer(serv, key);
    //merge data to package buffer
    return swString_append_ptr(package, data, len);
}

咱们会先根据key的值(其实是reactor线程的id),获取一块全局的内存,而后把接收到的chunk数据,追加到这个全局的内存上面,而swString_append_ptr执行的就是memcpy的操做。

因此,这就是一个性能问题了。worker进程接收到的全部数据都会被完整的拷贝一遍。若是客户端发来的数据很大,这个拷贝的开销也是很大声的。

所以,咱们对这部分合并的代码进行了一个优化。咱们让worker进程在接收master进程的数据以前,就准备好一块足够大的内存,而后直接把master进程发来的数据下来便可。

咱们先更新一下swoole-src的源码:

git checkout 529ad44d578930b3607abedcfc278364df34bc73

咱们依旧先看看process_send_packet函数的代码:

static int process_send_packet(swServer *serv, swPipeBuffer *buf, swSendData *resp, send_func_t _send, void* private_data)
{
    const char* data = resp->data;
    uint32_t send_n = resp->info.len;
    off_t offset = 0;
    uint32_t copy_n;

    uint32_t max_length = serv->ipc_max_size - sizeof(buf->info);

    if (send_n <= max_length)
    {
        buf->info.flags = 0;
        buf->info.len = send_n;
        memcpy(buf->data, data, send_n);

        int retval = _send(serv, buf, sizeof(buf->info) + send_n, private_data);
        return retval;
    }

    buf->info.flags = SW_EVENT_DATA_CHUNK;
    buf->info.len = send_n;

    while (send_n > 0)
    {
        if (send_n > max_length)
        {
            copy_n = max_length;
        }
        else
        {
            buf->info.flags |= SW_EVENT_DATA_END;
            copy_n = send_n;
        }

        memcpy(buf->data, data + offset, copy_n);

        swTrace("finish, type=%d|len=%d", buf->info.type, copy_n);

        if (_send(serv, buf, sizeof(buf->info) + copy_n, private_data) < 0)
        {
            return SW_ERR;
        }

        send_n -= copy_n;
        offset += copy_n;
    }

    return SW_OK;
}

咱们聚焦修改的地方,主要是对CHUNK的处理:

buf->info.flags = SW_EVENT_DATA_CHUNK;
buf->info.len = send_n;

咱们发现,buf->info.len的长度不是每一个小段chunk的长度了,而是整个大包的长度了。为何能够这样作呢?由于master进程与worker进程是经过udg进行通讯的,因此,worker进程在调用recv的时候,返回值实际上就是chunk的长度了,因此buf->info.len里面存储chunk的长度没有必要。

其余地方的逻辑和以前的代码没有区别。

咱们再来看看worker进程是如何接收master进程发来的数据的。在函数swWorker_onPipeReceive里面:

static int swWorker_onPipeReceive(swReactor *reactor, swEvent *event)
{
    int ret;
    ssize_t recv_n = 0;
    swServer *serv = (swServer *) reactor->ptr;
    swFactory *factory = &serv->factory;
    swPipeBuffer *pipe_buffer = serv->pipe_buffers[0];
    void *buffer;
    struct iovec buffers[2];

    // peek
    recv_n = recv(event->fd, &pipe_buffer->info, sizeof(pipe_buffer->info), MSG_PEEK);
    if (recv_n < 0 && errno == EAGAIN)
    {
        return SW_OK;
    }
    else if (recv_n < 0)
    {
        return SW_ERR;
    }

    if (pipe_buffer->info.flags & SW_EVENT_DATA_CHUNK)
    {
        buffer = serv->get_buffer(serv, &pipe_buffer->info);
        _read_from_pipe:

        buffers[0].iov_base = &pipe_buffer->info;
        buffers[0].iov_len = sizeof(pipe_buffer->info);
        buffers[1].iov_base = buffer;
        buffers[1].iov_len = serv->ipc_max_size - sizeof(pipe_buffer->info);

        recv_n = readv(event->fd, buffers, 2);
        if (recv_n < 0 && errno == EAGAIN)
        {
            return SW_OK;
        }
        if (recv_n > 0)
        {
            serv->add_buffer_len(serv, &pipe_buffer->info, recv_n - sizeof(pipe_buffer->info));
        }

        if (pipe_buffer->info.flags & SW_EVENT_DATA_CHUNK)
        {
            //wait more chunk data
            if (!(pipe_buffer->info.flags & SW_EVENT_DATA_END))
            {
                goto _read_from_pipe;
            }
            else
            {
                pipe_buffer->info.flags |= SW_EVENT_DATA_OBJ_PTR;
                /**
                 * Because we don't want to split the swEventData parameters into swDataHead and data,
                 * we store the value of the worker_buffer pointer in swEventData.data.
                 * The value of this pointer will be fetched in the swServer_worker_get_packet function.
                 */
                serv->copy_buffer_addr(serv, pipe_buffer);
            }
        }
    }
    else
    {
        recv_n = read(event->fd, pipe_buffer, serv->ipc_max_size);
    }

    if (recv_n > 0)
    {
        ret = swWorker_onTask(factory, (swEventData *) pipe_buffer, recv_n - sizeof(pipe_buffer->info));
        return ret;
    }

    return SW_ERR;
}

其中,

recv_n = recv(event->fd, &pipe_buffer->info, sizeof(pipe_buffer->info), MSG_PEEK);
if (recv_n < 0 && errno == EAGAIN)
{
    return SW_OK;
}
else if (recv_n < 0)
{
    return SW_ERR;
}

咱们先对内核缓冲区里面的数据进行一次peek操做,来获取到head部分。这样咱们就知道数据是不是以CHUNK方式发来的了。

if (pipe_buffer->info.flags & SW_EVENT_DATA_CHUNK)
{
    buffer = serv->get_buffer(serv, &pipe_buffer->info);
    _read_from_pipe:

    buffers[0].iov_base = &pipe_buffer->info;
    buffers[0].iov_len = sizeof(pipe_buffer->info);
    buffers[1].iov_base = buffer;
    buffers[1].iov_len = serv->ipc_max_size - sizeof(pipe_buffer->info);

    recv_n = readv(event->fd, buffers, 2);
    if (recv_n < 0 && errno == EAGAIN)
    {
        return SW_OK;
    }
    if (recv_n > 0)
    {
        serv->add_buffer_len(serv, &pipe_buffer->info, recv_n - sizeof(pipe_buffer->info));
    }

    if (pipe_buffer->info.flags & SW_EVENT_DATA_CHUNK)
    {
        //wait more chunk data
        if (!(pipe_buffer->info.flags & SW_EVENT_DATA_END))
        {
            goto _read_from_pipe;
        }
        else
        {
            pipe_buffer->info.flags |= SW_EVENT_DATA_OBJ_PTR;
            /**
                * Because we don't want to split the swEventData parameters into swDataHead and data,
                * we store the value of the worker_buffer pointer in swEventData.data.
                * The value of this pointer will be fetched in the swServer_worker_get_packet function.
                */
            serv->copy_buffer_addr(serv, pipe_buffer);
        }
    }
}

若是是CHUNK方式发来的数据,那么咱们执行以下的操做:

buffer = serv->get_buffer(serv, &pipe_buffer->info);

get_buffer是一个回调函数,对应:

static void* swServer_worker_get_buffer(swServer *serv, swDataHead *info)
{
    swString *worker_buffer = swServer_worker_get_input_buffer(serv, info->reactor_id);

    if (worker_buffer->size < info->len)
    {
        swString_extend(worker_buffer, info->len);
    }

    return worker_buffer->str + worker_buffer->length;
}

这里,咱们会先判断这块全局的buffer是否足够的大,能够接收完整个大包,若是不够大,咱们扩容到足够的大。

_read_from_pipe:

buffers[0].iov_base = &pipe_buffer->info;
buffers[0].iov_len = sizeof(pipe_buffer->info);
buffers[1].iov_base = buffer;
buffers[1].iov_len = serv->ipc_max_size - sizeof(pipe_buffer->info);

recv_n = readv(event->fd, buffers, 2);

而后,咱们调用readv,把head和实际的数据分别存在了两个地方。这么作是避免为了把head和实际的数据作拆分而致使的内存拷贝。

经过以上方式,Swoole Server减小了一次内存拷贝。

相关文章
相关标签/搜索