长平狐 memcached源代码阅读笔记(二):网络处理部分

Java、PHP、Ruby、iOS、Python 等 JetBrains 开发工具低至 99 元(3折),详情» html

既然memcached是一个缓存服务器,并且要提供高效的缓存服务,那么网络层确定要很是 有效率才行。要能支撑大量的并发链接,还要有很优秀的响应速度。除此以外,由于memcached的核心业务并非网络层,它的核心是缓存机制。那么就必 须采用一种机制,将网络层隔离,以避免网络通讯部分缠绕在系统的各处,扰乱了核心逻辑。 java

在这一点上要感谢基于事件驱动的网络库libevent。memcached就是采用这个来做为它的网络层,因此对于memcache来讲,即便有成千上万的链接处理起来也不是什么难事。 linux

libevent的事件驱动机制除了能提升网络处理的效率外,还抽象了各个操做系统上最高效的方式:好比Linux上的epoll, FreeBSD上的kqueue 以及Windows的IOCP。就不说跨平台了,单单使用好其中一种机制就不是一件容易事。除此以外,使用事件驱动的机制,还能让你将网络处理部分与业务 逻辑相分离。  编程

好,今天咱们就来看看memcached是如何利用libevent构建其网络层的(注意,memcache同时支持TCP和UDP,不过本文只会关注TCP部分)。 windows

为了更好的理解libevent,咱们先来看看linux上如何使用epoll实现基于事件的网络编程:  缓存

#define MAX_EVENTS 10
main(){
         int sfd;
         int flags;
         int efd;
         int nfds;
         int i;
         struct epoll_event ev, events[MAX_EVENTS];
         struct addrinfo *ai;
         struct addrinfo hints = {.ai_flags = AI_PASSIVE,
                                .ai_family = AF_UNSPEC};
        hints.ai_socktype = SOCK_STREAM;
         char port_buf[NI_MAXSERV];
        snprintf(port_buf,  sizeof(port_buf),  " %d ", port);
        getaddrinfo(NULL, port_buf, &hints, &ai);
         // create a socket to linsten
        sfd = socket(ai->ai_family, ai->ai_socktype, ai->ai_protocol);
        flags = fcntl(sfd, F_GETFL,  0);
         // set nob block
        fcntl(sfd, F_SETFL, flags | O_NONBLOCK);
         // bind
        bind(sfd, ai->ai_addr, ai->ai_addrlen);
         // listen
        listen(sfd,  15);
         // create epoll
        efd = epoll_create( 10);
        ev.events = EPOLLIN;
        ev.data.fd = sfd;
         // setup epoll
        epoll_ctl(efd, EPOLL_CTL_ADD, sfd, &ev);
         while( true){
                 // wait, block until a new connection is comming
                nfds = epoll_wait(efd, events, MAX_EVENTS, - 1);
                 for(i =  0; i < nfds; ++i){
                   dispatch(events[i]);
                }
        }

服务器

我在上面的代码里加了少许的注释。代码的前半部分没有什么,和全部的网络编程同样,就是建立套接字,而后bind到地址,开始监听。注意,这里咱们 将这个文件描述符设置为NON_BLOCK的,这一点很关键。而后咱们建立epoll,设置感兴趣的事件,而后在一个循环里wait事件的发生。 wait会阻塞,当一个事件发生时会继续运行,而后根据事件的类型做出不一样的处理。 网络

可能有人要问,我没有发现任何事件触发的意思啊,这跟.NET里咱们熟悉的事件处理差异太大了: 数据结构

btnOK.OnClick +=  delegate(sender, e){
    // ...

并发

不过若是你熟悉一点Win32编程的话就不那么认为了。不熟悉的话确定据说过消息循环吧。想一想看,上面的代码是否是跟Win32里的消息循环很是类 似。一个loop,而后接受事件,翻译事件,而后根据事件的类型分发给事件处理句柄。只不过在Win32里编写窗体程序时,事件的来源是鼠标点击按钮或者 键盘操做,而这里的事件来源是网络:多是一个新的链接到来,也多是读缓冲区里数据已经准备好(要了解Win32消息循环的更多细节能够参看我这篇文 章:点击这里)。至于为什么基于事件的这种机制对于大并发的系统颇有用不在本文的范围内,若是要阐述清楚这个可能须要介绍如下IO模型等问题,不过你也能够经过阅读我以前写的并行和异步了解一些概念。

不过要让应用开发人员都亲力亲为的处理消息循环来作事件驱动的编程太过于麻烦了,不论是后来的MFC,仍是VB抑或是WinForm中,那个消息循 环再也没出现过。取而代之的是如今好用的事件。网络编程也同样,要亲力亲为的处理这种细节,太麻烦了,并且太容易出错。最麻烦的还有各个平台提供的机制还 不同,如是相似libevent这样的类库就应运而生。 

如今咱们来看看memcache如何使用libevent进行高效的网络编程:

//event_base 是libevent的核心数据结构

staticstruct event_base *main_base;
main_base = event_init();
//而后就是建立scoket等
fd = socket(...)
bind()
listen()
//调用conn_new方法
conn_new(sfd, conn_listening,EV_READ | EV_PERSIST, 1,
                                             transport, main_base);

conn *conn_new(const int sfd, enum conn_states init_state,
                const int event_flags,
                const int read_buffer_size, enum network_transport transport,
                struct event_base *base) {
    c->sfd = sfd;
    c->state = init_state;
    //设置感兴趣的事件,事件句柄是event_handler
    event_set(&c->event, sfd, event_flags, event_handler, (void *)c);
    event_base_set(base, &c->event);
    event_add(&c->event0);
}
void event_handler(const int fd, const short which, void *arg) {
    drive_machine(c);
}
static void drive_machine(conn *c) {
 while (!stop) {
        switch(c->state) {
        case conn_listening:
            sfd = accept(c->sfd, (struct sockaddr *)&addr, &addrlen);
            flags = fcntl(sfd, F_GETFL, 0)) < 0 ||
                fcntl(sfd, F_SETFL, flags | O_NONBLOCK);
           dispatch_conn_new(sfd, conn_new_cmd, EV_READ | EV_PERSIST,
                                     DATA_BUFFER_SIZE, tcp_transport);

}

为了代码更清晰,我删除了错误处理等代码。上面的代码从构建libevent的核心数据结构event_base开始,而后建立socket,给该 socket设置感兴趣的事件,而且设置了事件处理句柄event_handler,而后咱们根据conn数据结构的state字段判断如今是什么事件并 做出相应的处理(conn数据结构是memcache里处理网络链接的核心数据结构之一)。第一步固然是一个新的链接的到来:链接到了memcache服 务器,这是第一个事件。而后咱们经过dispatch_conn_new函数分发这个新到来的链接。在链接到来以后咱们就要进入下一步处理了:好比从缓冲 区读取数据,在这里就是接收memcache客户端发过来的各类命令,或者向缓冲区写出数据,好比咱们接到一个get命令,而后咱们就从缓存里读取相应的 key对应的值,而后将该值写到缓冲区(关于memcache协议(命令)处理的内容后面的文章会有介绍,本文咱们只关注网络处理部分)。

不知道刚才你阅读过我那篇Win32消息循环的文章没,在窗体编程中,为了在作耗时操做时不让界面假死咱们经常使用的作法就是建立一个有别于消息循环的 主线程,而后在这个线程里处理这些耗时操做。这里也同样,当链接到来后,咱们要读命令,解析命令,处理命令。为了避免让这些操做阻塞了主线程,即监听链接到 来的线程(想一想若是阻塞告终果会是怎样?),memcache也是相似的处理方法:主线程接收新链接,而后建立一些线程(可配置的)来处理命令。好,咱们 仍是来看代码吧:

thread_init(settings.num_threads, main_base);
void thread_init( int nthreads,  struct event_base *main_base) {
    threads = calloc(nthreads,  sizeof(LIBEVENT_THREAD));

    dispatcher_thread. base = main_base;
    dispatcher_thread.thread_id = pthread_self();
    //这里很巧妙
     for (i =  0; i < nthreads; i++) {

        //建立一个管道,管道对应两个描述符,一个用于写,一个用于读。 

        int fds[2];
        pipe(fds);

        threads[i].notify_receive_fd = fds[0];
        threads[i].notify_send_fd = fds[1];

        setup_thread(&threads[i]);
        stats.reserved_fds += 5;
    }

    for (i = 0; i < nthreads; i++) {
        create_worker(worker_libevent, &threads[i]);
    }
   //只有全部的线程都初始化完毕后,这里才会继续执行,不然阻塞
    while (init_count < nthreads) {
        pthread_cond_wait(&init_cond, &init_lock);
    }
}
static void setup_thread(LIBEVENT_THREAD *me) {
    me->base = event_init();

    //在这里,咱们在管道的读端注册感兴趣的事件 

    event_set(&me->notify_event, me->notify_receive_fd,
              EV_READ | EV_PERSIST, thread_libevent_process, me);
    event_base_set(me->base, &me->notify_event);
    event_add(&me->notify_event, 0);
}
//事件处理函数,这里又调用了conn_new函数
static void thread_libevent_process(int fd, short which, void *arg) {
    LIBEVENT_THREAD *me = arg;
        CQ_ITEM *item;

        //从队列里取出要处理的链接

        item = cq_pop(me->new_conn_queue);
        conn *c = conn_new(item->sfd, item->init_state, item->event_flags,
                           item->read_buffer_size, item->transport, me->base);
      
}
//建立worker线程用来处理命令等
static void create_worker(void *(*func)(void *), void *arg) {
    pthread_t       thread;
    pthread_attr_t  attr;
    int             ret;

    pthread_attr_init(&attr);

    ret = pthread_create(&thread, &attr, func, arg);

 上面的代码会在memcache初始化的时候就会执行,会建立一堆线程等待事件处理。那么是用什么方式将事件传递给这些线程呢,这里实现有点巧妙。注意到这几行代码:

pipe(fds); 

threads[i].notify_receive_fd = fds[0];
threads[i].notify_send_fd = fds[1];

这里建立了一个管道(pipe),管道对应两个描述符,一个用于写,一个读。再来看看前面一段代码里咱们没有贴出的dispatch_conn_new函数:

void  dispatch_conn_new( int  sfd,  enum  conn_states init_state,  int  event_flags, int read_buffer_size,  enum network_transport transport) {

    CQ_ITEM *item = cqi_new();
    int tid = (last_thread + 1) % settings.num_threads;

    LIBEVENT_THREAD *thread = threads + tid;

    last_thread = tid;

    item->sfd = sfd;
    item->init_state = init_state;
    item->event_flags = event_flags;
    item->read_buffer_size = read_buffer_size;
    item->transport = transport;
    //将链接放到一个队列里
    cq_push(thread->new_conn_queue, item); 

    //向管道的写端写一个字节的数字,人为的触发事件 

    write(thread->notify_send_fd, ""1);

有意思的代码就在最后一行,在接收到一个链接以后,从刚才建立的一些LIBEVENT_THREAD里,选一个。这个结构里就有刚才建立的那个管道 对应的两个描述符,而后往管道的写端写如一个字节,由于上面的setup_thread里为管道的读端设置了感兴趣的事件,这个时候事件就触发了(人为的 触发一个事件,而且将刚才的链接放到一个队列里(这里是每一个线程一个队列),这样就将一个同步的事件转换为异步的了)。事件触发后,thread_libevent_process函数就会执行。 而后又进入到conn_new函数,进入到drive_machine函数,又根据conn的state进行事件处理。

PS:我不知道这种利用管道,而后人为的触发事件的机制是否是一种什么模式或惯用法。除了在memcache这里我见到了这种方式外,在java的 NIO里也有相似的使用。在java NIO里建立一个Selector后,会建立一个管道(在Linux上,而在windows上会建立一对socket),咱们能够经过向管道的写端写入一 个字节来唤醒已经阻塞的selector。

在介绍完memcache的网络处理部分后,下一篇咱们就能够看看memcache是如何从网络上读取内容,解析命令的。 


原文连接: http://www.cnblogs.com/yuyijq/archive/2012/01/02/2291025.html
相关文章
相关标签/搜索