【Redis5源码学习】浅析redis中的IO多路复用与事件机制

baiyanredis

引入

读这篇文章以前请先阅读:浅析服务器并发IO性能提高之路—从网络编程基础到epoll,以更好的理解本文的内容,谢谢。
咱们知道,咱们在使用redis的时候,经过客户端发送一个get命令,就可以获得redis服务端返回的数据。redis是基于传统的C/S架构实现的。它经过监听一个TCP端口(6379)的方式来接收来自客户端的链接,从而进行后续命令的执行,并把执行结果返回给客户端。编程

redis是一个合格的服务端程序

咱们先思考一个问题:做为一个合格的服务端程序,咱们在命令行输入一个get命令以后,redis服务端是怎么处理这个命令,并把结果返回给客户端的呢?
要回答这个问题,咱们先回顾上一篇文章中讲过的,客户端与服务器须要分别建立一个套接字代表本身所在的网络地址与端口号,而后基于TCP协议来进行套接字之间的通讯。一般状况下,一个服务端程序的socket通讯流程以下:segmentfault

int main(int argc, char *argv[]) {
    listenSocket = socket(); //调用socket()系统调用建立一个监听套接字描述符
    bind(listenSocket);  //绑定地址与端口
    listen(listenSocket); //由默认的主动套接字转换为服务器适用的被动套接字
    while (1) { //不断循环去监听是否有客户端链接事件到来
        connSocket = accept($listenSocket); //接受客户端链接
        read(connsocket); //从客户端读取数据,只能同时处理一个客户端
        write(connsocket); //返回给客户端数据,只能同时处理一个客户端
    }
    return 0;
}

在redis中,一样要通过以上几个步骤。与客户端创建链接以后,就会读取客户端发来的命令,而后执行命令,最后经过调用write系统调用,将命令的执行结果返回给客户端。
可是这样一个进程只能同时处理一个客户端的链接与读写事件。为了让单进程的服务端应用同时处理多个客户端的事件,咱们采用了IO多路复用机制。目前最好的IO多路复用机制就是epoll。回顾咱们上一篇文章中最终使用epoll建立的服务器代码:api

int main(int argc, char *argv[]) {

    listenSocket = socket(AF_INET, SOCK_STREAM, 0); //同上,建立一个监听套接字描述符
    
    bind(listenSocket)  //同上,绑定地址与端口
    
    listen(listenSocket) //同上,由默认的主动套接字转换为服务器适用的被动套接字
    
    epfd = epoll_create(EPOLL_SIZE); //建立一个epoll实例
    
    ep_events = (epoll_event*)malloc(sizeof(epoll_event) * EPOLL_SIZE); //建立一个epoll_event结构存储套接字集合
    event.events = EPOLLIN;
    event.data.fd = listenSocket;
    
    epoll_ctl(epfd, EPOLL_CTL_ADD, listenSocket, &event); //将监听套接字加入到监听列表中
    
    while (1) {
    
        event_cnt = epoll_wait(epfd, ep_events, EPOLL_SIZE, -1); //等待返回已经就绪的套接字描述符们
        
        for (int i = 0; i < event_cnt; ++i) { //遍历全部就绪的套接字描述符
            if (ep_events[i].data.fd == listenSocket) { //若是是监听套接字描述符就绪了,说明有一个新客户端链接到来
            
                connSocket = accept(listenSocket); //调用accept()创建链接
                
                event.events = EPOLLIN;
                event.data.fd = connSocket;
                
                epoll_ctl(epfd, EPOLL_CTL_ADD, connSocket, &event); //添加对新创建的链接套接字描述符的监听,以监听后续在链接描述符上的读写事件
                
            } else { //若是是链接套接字描述符事件就绪,则能够进行读写
            
                strlen = read(ep_events[i].data.fd, buf, BUF_SIZE); //从链接套接字描述符中读取数据, 此时必定会读到数据,不会产生阻塞
                if (strlen == 0) { //已经没法从链接套接字中读到数据,须要移除对该socket的监听
                
                    epoll_ctl(epfd, EPOLL_CTL_DEL, ep_events[i].data.fd, NULL); //删除对这个描述符的监听
                    
                    close(ep_events[i].data.fd);
                } else {
                    write(ep_events[i].data.fd, buf, str_len); //若是该客户端可写 把数据写回到客户端
                }
            }
        }
    }
    close(listenSocket);
    close(epfd);
    return 0;
}

redis基于原有的select、poll与epoll机制,结合本身独特的业务需求,封装了本身的一套事件处理函数,咱们把它叫作ae(a simple event-driven programming library)。而redis具体使用select、epoll仍是mac上的kqueue技术,redis会首先进行判断,而后选择性能最优的那个:数组

/* Include the best multiplexing layer supported by this system.
 * The following should be ordered by performances, descending. */
#ifdef HAVE_EVPORT
#include "ae_evport.c"
#else
    #ifdef HAVE_EPOLL
    #include "ae_epoll.c"
    #else
        #ifdef HAVE_KQUEUE
        #include "ae_kqueue.c"
        #else
        #include "ae_select.c"
        #endif
    #endif
#endif

由于 select 函数是做为 POSIX 标准中的系统调用,在不一样版本的操做系统上都会实现,因此将其做为兜底方案。为了讲述方便,后面的文章均使用epoll机制来说解。服务器

redis中的IO多路复用

当咱们在命令行中启动一个redis-server的时候,redis其实作了和咱们以前写的epoll服务器相似的操做,重点的函数调用有如下三个:网络

int main(int argc, char **argv) {
    ...
    initServerConfig(); //初始化存储服务端信息的结构体
    ...
    initServer(); //初始化redis事件循环并调用epoll_create与epoll_ctl。建立socket、bind、listen、accept都在这个函数中进行调用,并注册调用后返回的监听描述符和链接描述符
    ...
    aeMain(); //执行while(1)事件循环,并调用epoll_wait获取已就绪的描述符,并调用对应的handler
    ...
}

接下来咱们一个一个来看:架构

initServerConfig()

redis服务端的全部信息都存储在一个redisServer结构体中,这个结构体字段很是多,好比服务端的套接字信息(如地址和端口),还有不少支持redis其余功能如集群、持久化等的配置信息都存储在这个结构体中。这个函数调用就是对redisServer结构体的全部字段进行初始化并赋一个初始值。因为咱们此次讲解的是事件与IO多路复用机制在redis中的应用,因此咱们只关注其中的几个字段便可。并发

initServer()

这个函数调用是咱们的重中之重。初始化完服务器的相关信息以后,就须要进行套接字的建立、绑定、监听并与客户端创建链接了。在这个函数中,进行了咱们常说的建立socket、bind、listen、accept、epoll_create、epoll_ctl调用,咱们能够对照上文的epoll服务器,逐步了解redis的事件机制。initServer()的主要函数调用以下:socket

void initServer(void) {
    ...
    server.el = aeCreateEventLoop(server.maxclients+CONFIG_FDSET_INCR); 
    ...

    if (server.port != 0 && listenToPort(server.port,server.ipfd,&server.ipfd_count) == C_ERR)
        exit(1);
    ...

    for (j = 0; j < server.ipfd_count; j++) {
        if (aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE, acceptTcpHandler,NULL) == AE_ERR){
                serverPanic("Unrecoverable error creating server.ipfd file event.");
       }
    }
    ...
}

咱们按照从上到下的顺序解读这几行关键代码:

aeCreateEventLoop()

在redis中,有一个aeEventLoop的概念,它来管理全部相关的事件描述字段、存储已注册的事件、已就绪的事件:

typedef struct aeEventLoop {
    int stop; //标识事件循环(即while(1))是否结束
    
    aeFileEvent *events;  //存储已经注册的文件事件(文件事件即客户端链接与读写事件)
    aeFiredEvent *fired;  //存储已就绪的文件事件
    aeTimeEvent *timeEventHead; //存储时间事件(时间事件后面再讲)
    
    void *apidata; /* 存储epoll相关信息 */
    
    aeBeforeSleepProc *beforesleep; //事件发生前须要调用的函数
    aeBeforeSleepProc *aftersleep; //事件发生后须要调用的函数
} aeEventLoop;

redis将全部经过epoll_wait()返回的就绪描述符都存储在fired数组中,而后遍历这个数组,并调用对应的事件处理函数,一次性处理完全部事件。在aeCreateEventLoop()函数中,对这个管理全部事件信息的结构体字段进行了初始化,这里面也包括调用epoll_create(),对epoll的epfd进行初始化:

aeEventLoop *aeCreateEventLoop(int setsize) {
    aeEventLoop *eventLoop;
    int i;

    if ((eventLoop = zmalloc(sizeof(*eventLoop))) == NULL) goto err;
    eventLoop->events = zmalloc(sizeof(aeFileEvent)*setsize);
    eventLoop->fired = zmalloc(sizeof(aeFiredEvent)*setsize);
    if (eventLoop->events == NULL || eventLoop->fired == NULL) goto err;
    eventLoop->setsize = setsize;
    eventLoop->lastTime = time(NULL);
    eventLoop->timeEventHead = NULL;
    eventLoop->timeEventNextId = 0;
    eventLoop->stop = 0;
    eventLoop->maxfd = -1;
    eventLoop->beforesleep = NULL;
    eventLoop->aftersleep = NULL;
    if (aeApiCreate(eventLoop) == -1) goto err; //调用aeApiCreate(),内部会调用epoll_create()
    for (i = 0; i < setsize; i++)
        eventLoop->events[i].mask = AE_NONE;
    return eventLoop;
}

在aeApiCreate()函数中,调用了epoll_create(),并将建立好的epfd放到eventLoop结构体的apidata字段保管:

typedef struct aeApiState {
    int epfd;
    struct epoll_event *events;
} aeApiState;

static int aeApiCreate(aeEventLoop *eventLoop) {
    aeApiState *state = zmalloc(sizeof(aeApiState));

    if (!state) return -1;
    state->events = zmalloc(sizeof(struct epoll_event)*eventLoop->setsize);
    if (!state->events) {
        zfree(state);
        return -1;
    }
    state->epfd = epoll_create(1024); /* 调用epoll_create初始化epoll的epfd */
    if (state->epfd == -1) {
        zfree(state->events);
        zfree(state);
        return -1;
    }
    eventLoop->apidata = state; //将建立好的epfd放到eventLoop结构体的apidata字段保管
    return 0;
}

listenToPort()

在建立完epfd以后,咱们就要进行socket建立、绑定、监听的操做了,这几步在listenToPort()函数来进行:

int listenToPort(int port, int *fds, int *count) {
    if (server.bindaddr_count == 0) server.bindaddr[0] = NULL;
    for (j = 0; j < server.bindaddr_count || j == 0; j++) { //遍历全部的ip地址
        if (server.bindaddr[j] == NULL) { //尚未绑定地址
           ...
        } else if (strchr(server.bindaddr[j],':')) { //绑定IPv6地址
            ...
        } else { //绑定IPv4地址,通常会进到这个if分支中
            fds[*count] = anetTcpServer(server.neterr,port,server.bindaddr[j], server.tcp_backlog);  //真正的绑定逻辑
        }
        ...
    }
    return C_OK;
}

redis会先进行绑定ip地址类型的判断,咱们通常是IPv4,因此通常会走到第三个分支,调用anetTcpServer()函数来进行具体的绑定逻辑:

static int _anetTcpServer(char *err, int port, char *bindaddr, int af, int backlog)
{
   ...
    if ((rv = getaddrinfo(bindaddr,_port,&hints,&servinfo)) != 0) {
        anetSetError(err, "%s", gai_strerror(rv));
        return ANET_ERR;
    }
    for (p = servinfo; p != NULL; p = p->ai_next) {
        if ((s = socket(p->ai_family,p->ai_socktype,p->ai_protocol)) == -1) //调用socket()建立一个监听套接字
            continue;

        if (af == AF_INET6 && anetV6Only(err,s) == ANET_ERR) goto error;
        if (anetSetReuseAddr(err,s) == ANET_ERR) goto error;
        if (anetListen(err,s,p->ai_addr,p->ai_addrlen,backlog) == ANET_ERR) s = ANET_ERR; //调用bind()与listen()绑定端口并转化为服务端被动套接字
        goto end;
    }
}

在调用socket()系统调用建立了套接字以后,须要进一步调用bind()与listen(),这两步是在anetListen()函数内部实现的:

static int anetListen(char *err, int s, struct sockaddr *sa, socklen_t len, int backlog) {
    if (bind(s,sa,len) == -1) { //调用bind()绑定端口
        anetSetError(err, "bind: %s", strerror(errno));
        close(s);
        return ANET_ERR;
    }

    if (listen(s, backlog) == -1) { //调用listen()将主动套接字转换为被动监听套接字
        anetSetError(err, "listen: %s", strerror(errno));
        close(s);
        return ANET_ERR;
    }
    return ANET_OK;
}

看到这里,咱们知道redis和咱们写过的epoll服务器同样,都是须要进行套接字建立、绑定、监听的过程。

aeCreateFileEvent

在redis中,把客户端链接事件、读写事件统称为文件事件。咱们刚才完成了socket建立、bind、listen的过程。目前咱们已经有了一个监听描述符,那么咱们须要首先将监听描述符添加到epoll的监听列表,以监听客户端的链接事件。在initServer()中,经过调用aeCreateFileEvent(),同时指定了它的事件处理函数acceptTcpHandler()来实现对客户端链接事件的处理:

for (j = 0; j < server.ipfd_count; j++) {
        if (aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE, acceptTcpHandler,NULL) == AE_ERR){
                serverPanic("Unrecoverable error creating server.ipfd file event.");
        }
    }

跟进aeCreateFileEvent()函数,发现其内部进一步调用了aeApiAddEvent()函数:

int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask, aeFileProc *proc, void *clientData) {
    if (fd >= eventLoop->setsize) {
        errno = ERANGE;
        return AE_ERR;
    }
    aeFileEvent *fe = &eventLoop->events[fd];

    if (aeApiAddEvent(eventLoop, fd, mask) == -1)
        return AE_ERR;
    fe->mask |= mask;
    if (mask & AE_READABLE) fe->rfileProc = proc;
    if (mask & AE_WRITABLE) fe->wfileProc = proc;
    fe->clientData = clientData;
    if (fd > eventLoop->maxfd)
        eventLoop->maxfd = fd;
    return AE_OK;
}
static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask) {
    aeApiState *state = eventLoop->apidata;
    struct epoll_event ee = {0}; 
    int op = eventLoop->events[fd].mask == AE_NONE ?
            EPOLL_CTL_ADD : EPOLL_CTL_MOD;

    ee.events = 0;
    mask |= eventLoop->events[fd].mask;
    if (mask & AE_READABLE) ee.events |= EPOLLIN;
    if (mask & AE_WRITABLE) ee.events |= EPOLLOUT;
    ee.data.fd = fd;
    if (epoll_ctl(state->epfd,op,fd,&ee) == -1) return -1; //调用epoll_ctl添加客户端链接事件
    return 0;
}

aeApiAddEvent函数会调用epoll_ctl(),将客户端链接事件添加到监听列表。同时,redis会将该事件的处理函数放到aeFileEvent结构体中进行存储:

typedef struct aeFileEvent {
    int mask; /* one of AE_(READABLE|WRITABLE|BARRIER) */
    aeFileProc *rfileProc; //读事件处理程序
    aeFileProc *wfileProc; //写事件处理程序
    void *clientData;  //客户端数据
} aeFileEvent;

对照以前咱们写过的epoll服务端程序,咱们已经实现了如下几个步骤:

int main(int argc, char *argv[]) {

    listenSocket = socket(AF_INET, SOCK_STREAM, 0); //建立一个监听套接字描述符
    
    bind(listenSocket)  //绑定地址与端口
    
    listen(listenSocket) //由默认的主动套接字转换为服务器适用的被动套接字
    
    epfd = epoll_create(EPOLL_SIZE); //建立一个epoll实例
    
    ep_events = (epoll_event*)malloc(sizeof(epoll_event) * EPOLL_SIZE); //建立一个epoll_event结构存储套接字集合
    event.events = EPOLLIN;
    event.data.fd = listenSocket;
    
    epoll_ctl(epfd, EPOLL_CTL_ADD, listenSocket, &event); //将监听套接字加入到监听列表中
   ...
}

咱们已经实现了对套接字的建立、bind、listen,已经过epoll_create()实现了epfd的建立,并将初始的监听套接字描述符事件添加到了epoll的监听列表中,并为他指定了事件处理函数。下一步,就应该到了while(1)循环调用epoll_wait()的阶段了。经过阻塞调用epoll_wait(),返回全部已经就绪的套接字描述符,触发相应事件,而后对事件进行处理。

aeMain()

最后就是经过while(1)循环,等待客户端链接事件的到来啦:

void aeMain(aeEventLoop *eventLoop) {
    eventLoop->stop = 0;
    while (!eventLoop->stop) {
        if (eventLoop->beforesleep != NULL)
            eventLoop->beforesleep(eventLoop);
        aeProcessEvents(eventLoop, AE_ALL_EVENTS|AE_CALL_AFTER_SLEEP);
    }
}

在eventLoop中,采用stop标志来断定循环是否结束。若是没有结束,那么循环调用aeProcessEvents()。咱们猜想,这里面就调用了epoll_wait(),阻塞等待事件的到来,而后遍历全部就绪的套接字描述符,而后调用对应的事件处理函数便可:

int aeProcessEvents(aeEventLoop *eventLoop, int flags)
{
        numevents = aeApiPoll(eventLoop, tvp); //调用epoll_wait()
        ...
}

咱们跟进aeApiPoll,来看看epoll_wait()是如何调用的:

static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) {
    aeApiState *state = eventLoop->apidata; //
    int retval, numevents = 0;

    retval = epoll_wait(state->epfd,state->events,eventLoop->setsize, tvp ? (tvp->tv_sec*1000 + tvp->tv_usec/1000) : -1);
    if (retval > 0) {
        int j;
        numevents = retval;
        for (j = 0; j < numevents; j++) {
            int mask = 0;
            struct epoll_event *e = state->events+j;

            if (e->events & EPOLLIN) mask |= AE_READABLE;
            if (e->events & EPOLLOUT) mask |= AE_WRITABLE;
            if (e->events & EPOLLERR) mask |= AE_WRITABLE;
            if (e->events & EPOLLHUP) mask |= AE_WRITABLE;
            eventLoop->fired[j].fd = e->data.fd;
            eventLoop->fired[j].mask = mask;
        }
    }
    return numevents;
}

首先从eventLoop中拿出以前在aeApiCreate()中建立的epfd与已经注册的事件集合,调用epoll_wait()等待事件们的到来,并返回全部就绪事件的描述符集合。随后,遍历全部就绪的描述符集合,判断它是什么类型的描述符,是可读仍是可写的,随后将全部就绪可处理的事件存储到eventLoop中的fired数组中,并把相应数组位置上的可读仍是可写标记也一并存储。
回到外部调用处,咱们如今已经把全部可以处理的事件都放到了fired数组中,那么咱们就能够经过遍历这个数组,拿到全部能够处理的事件,而后调用对应的事件处理函数:

int aeProcessEvents(aeEventLoop *eventLoop, int flags)
{
        numevents = aeApiPoll(eventLoop, tvp); //调用epoll_wait()

        for (j = 0; j < numevents; j++) {
            aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd]; //循环拿出全部就绪的事件
            int mask = eventLoop->fired[j].mask;
            int fd = eventLoop->fired[j].fd;
            int fired = 0; 

            if (!invert && fe->mask & mask & AE_READABLE) {
                fe->rfileProc(eventLoop,fd,fe->clientData,mask); //若是该事件是读事件,调用读事件处理函数
                fired++;
            }

            if (fe->mask & mask & AE_WRITABLE) {
                if (!fired || fe->wfileProc != fe->rfileProc) {
                    fe->wfileProc(eventLoop,fd,fe->clientData,mask); //若是该事件是写事件,调用写事件处理函数
                    fired++;
                }
            }
        }
    }
    ...
}

至于如何区分是客户端链接事件以及读写事件,redis经过指定不一样的事件处理函数(如accept事件是acceptTcpHandler事件处理函数),读或写事件又是其余的事件处理函数。经过这层封装,免去了判断套接字描述符类型的步骤,直接调用以前注册的事件处理函数便可、
回顾咱们以前写过的的epoll服务器,是否是和这一段代码很类似呢?

while (1) {
    
        event_cnt = epoll_wait(epfd, ep_events, EPOLL_SIZE, -1); //等待返回已经就绪的套接字描述符们
        
        for (int i = 0; i < event_cnt; ++i) { //遍历全部就绪的套接字描述符
            if (ep_events[i].data.fd == listenSocket) { //若是是监听套接字描述符就绪了,说明有一个新客户端链接到来
            
                connSocket = accept(listenSocket); //调用accept()创建链接
                
                event.events = EPOLLIN;
                event.data.fd = connSocket;
                
                epoll_ctl(epfd, EPOLL_CTL_ADD, connSocket, &event); //添加对新创建的链接套接字描述符的监听,以监听后续在链接描述符上的读写事件
                
            } else { //若是是链接套接字描述符事件就绪,则能够进行读写
            
                strlen = read(ep_events[i].data.fd, buf, BUF_SIZE); //从链接套接字描述符中读取数据, 此时必定会读到数据,不会产生阻塞
                if (strlen == 0) { //已经没法从链接套接字中读到数据,须要移除对该socket的监听
                
                    epoll_ctl(epfd, EPOLL_CTL_DEL, ep_events[i].data.fd, NULL); //删除对这个描述符的监听
                    
                    close(ep_events[i].data.fd);
                } else {
                    write(ep_events[i].data.fd, buf, str_len); //若是该客户端可写 把数据写回到客户端
                }
            }
        }
    }

总结

至此,咱们就掌握了redis中的IO多路复用场景。redis把全部链接与读写事件、还有咱们没提到的时间事件一块儿集中管理,并对底层IO多路复用机制进行了封装,最终实现了单进程可以处理多个链接以及读写事件。这就是IO多路复用在redis中的应用。

相关文章
相关标签/搜索