转载请注明出处http://www.javashuo.com/article/p-kbglwwgv-e.html,谢谢合做!html
前面已经分析了memcached中的slabs内存管理及缓存对象如何利用item表达并存储在slabs管理的空间中,并分析了如何实现LRU策略实现缓存对象的释放。但memcached是一个client-server结构的缓存系统,服务端须要接收客户端的指令,而后在服务端作相应的操做,好比新建缓存对象,读取缓存对象。memcached使用libevent实现这些网络通讯操做,接下来将会分析memcached如何使用libevent处理网络事件。git
一般的网络编程模型中,服务端都会有一个处于listening状态的socket,当客户端尝试与服务端创建链接时,服务端会创建一个新的socket与该客户端通讯,并将该通讯任务交给其它线程完成。事实上,memcached也采用了这样的模型,但它经过libevent实现这个模型。github
简单地说libevent的工做方式就是:一个event_base,一些加入event_base中的events,每个event都有一个回调函数,event_bse不停地循环等待这些events发生,事件发生后调用对应event的回调函数。编程
memcached中有2类event_base:main_base,位于主线程中,处理listening socket;normal event_base,位于工做线程中,处理accepted socket。它们的关系能够简单地以图1-1表示,数组
图1-1 memcached中的工做线程模型缓存
(ps: 工做线程的数量能够在启动memcached时设置)网络
memcached使用struct LIBEVENT_THREAD结构保存工做线程相关的信息,其定义以下,数据结构
typedef struct { pthread_t thread_id; /* unique ID of this thread */ struct event_base *base; /* libevent handle this thread uses */ struct event notify_event; /* listen event for notify pipe */ int notify_receive_fd; /* receiving end of notify pipe */ int notify_send_fd; /* sending end of notify pipe */ struct thread_stats stats; /* Stats generated by this thread */ struct conn_queue *new_conn_queue; /* queue of new connections to handle */ cache_t *suffix_cache; /* suffix cache */ #ifdef EXTSTORE cache_t *io_cache; /* IO objects */ void *storage; /* data object for storage system */ #endif logger *l; /* logger buffer */ void *lru_bump_buf; /* async LRU bump buffer */ #ifdef TLS char *ssl_wbuf; #endif } LIBEVENT_THREAD;
thread_id它保存了线程的线程id,base,保存了属于该线程的event_base结构。socket
notify_event, notify_received_fd, notify_send_fd三个成员一块儿用于完成唤醒线程的任务,notify_event被加入到它的event_base中,监听nofity_received_fd上的读事件,当有读事件发生时,唤醒线程。外部想要唤醒该线程时,向notify_send_fd发送数据,便可引起nofity_received_fd上的读事件。这两个fd的关联经过pipe通道完成。async
另外有一个new_conn_queue成员,这个成员用来协助完成主线程的任务分发工做。当主线程接受新的链接后,创建一个struct conn_queue_item结构,并将该结构push到某个工做线程的new_conn_queue指向的队列上。worker thread被唤醒后从该队列上pop新的链接创建新的事件加入它的event_base,开始监听它的网络通讯。如下是new_conn_queue的及其元素CQ_ITEM的定义
/* A connection queue. */ typedef struct conn_queue CQ; struct conn_queue { CQ_ITEM *head; CQ_ITEM *tail; pthread_mutex_t lock; }; typedef struct conn_queue_item CQ_ITEM; struct conn_queue_item { int sfd; enum conn_states init_state; int event_flags; int read_buffer_size; enum network_transport transport; enum conn_queue_item_modes mode; conn *c; void *ssl; CQ_ITEM *next; };
CQ_ITEM中sfd包含了相应的socket,transport表示传输层协议类型,read_buffer_size表明读数据的buffer初始大小,init_state表示这个conn的初始状态,用于struct conn结构,conn结构是memcached中用于读写数据的一个关键数据结构,后面将进行介绍。相关的队列操做函数是cq_pop与cq_push。
不管是listening socekt的事件回调函数仍是accepted socket的事件回调函数,在memcached中都是event_handler,该函数会调用一个至关复杂的函数drive_mathine,它会根据状态进行不一样的处理。如main thread中的listening socket有链接事件到来时,回调函数将调用accept函数接受链接请求,并将新创建的socket交给某个worker thread处理。worker thread中的socket的事件发生时,处理工做相对较复杂,包括接收数据,创建item,并进行存储,或者返回响应信息等。
首先,为了服务端与客户端的沟通,memcached定义了它的信息的格式协议。协议有两类,一类是text protocol,一类是binary protocol,具体可参考https://github.com/memcached/memcached/wiki/Protocols。事件回调函数中根据协议对消息进行解析,并进行响应。
前面提到,memcached中全部的libevent网络事件的回调函数都是event_handler,而这个函数又简单地调用了drive_machine完成工做,drive_mathine根据conn_states值作不一样的操做。每个链接在memcached中都使用一个conn结构进行描述,其中包含了它的socket, event及读写数据使用到的相关buffer描述,而conn_states正是描述struct conn目前所处的状态的成员,其取值范围以下,
/** * Possible states of a connection. */ enum conn_states { conn_listening, /**< the socket which listens for connections */ conn_new_cmd, /**< Prepare connection for next command */ conn_waiting, /**< waiting for a readable socket */ conn_read, /**< reading in a command line */ conn_parse_cmd, /**< try to parse a command from the input buffer */ conn_write, /**< writing out a simple response */ conn_nread, /**< reading in a fixed number of bytes */ conn_swallow, /**< swallowing unnecessary bytes w/o storing */ conn_closing, /**< closing this connection */ conn_mwrite, /**< writing out many items sequentially */ conn_closed, /**< connection is closed */ conn_watch, /**< held by the logger thread as a watcher */ conn_max_state /**< Max state value (used for assertion) */ };
conn_listening状态下drive_machine会从该listening socket上接受客户端链接,创建新的链接,分发给工做线程。
具体的状态转移及所作的相关操做见图2-1,因为其中使用了许多struct conn的成员,在此以前先前conn的定义列出:
/** * The structure representing a connection into memcached. */ struct conn { int sfd; #ifdef TLS SSL *ssl; char *ssl_wbuf; bool ssl_enabled; #endif sasl_conn_t *sasl_conn; bool sasl_started; bool authenticated; enum conn_states state; enum bin_substates substate; rel_time_t last_cmd_time; struct event event; short ev_flags; short which; /** which events were just triggered */ char *rbuf; /** buffer to read commands into */ char *rcurr; /** but if we parsed some already, this is where we stopped */ int rsize; /** total allocated size of rbuf */ int rbytes; /** how much data, starting from rcur, do we have unparsed */ char *wbuf; char *wcurr; int wsize; int wbytes; /** which state to go into after finishing current write */ enum conn_states write_and_go; void *write_and_free; /** free this memory after finishing writing */ char *ritem; /** when we read in an item's value, it goes here */ int rlbytes; /* data for the nread state */ /** * item is used to hold an item structure created after reading the command * line of set/add/replace commands, but before we finished reading the actual * data. The data is read into ITEM_data(item) to avoid extra copying. */ void *item; /* for commands set/add/replace */ /* data for the swallow state */ int sbytes; /* how many bytes to swallow */ /* data for the mwrite state */ struct iovec *iov; int iovsize; /* number of elements allocated in iov[] */ int iovused; /* number of elements used in iov[] */ struct msghdr *msglist; int msgsize; /* number of elements allocated in msglist[] */ int msgused; /* number of elements used in msglist[] */ int msgcurr; /* element in msglist[] being transmitted now */ int msgbytes; /* number of bytes in current msg */ item **ilist; /* list of items to write out */ int isize; item **icurr; int ileft; char **suffixlist; int suffixsize; char **suffixcurr; int suffixleft; #ifdef EXTSTORE int io_wrapleft; unsigned int recache_counter; io_wrap *io_wraplist; /* linked list of io_wraps */ bool io_queued; /* FIXME: debugging flag */ #endif enum protocol protocol; /* which protocol this connection speaks */ enum network_transport transport; /* what transport is used by this connection */ /* data for UDP clients */ int request_id; /* Incoming UDP request ID, if this is a UDP "connection" */ struct sockaddr_in6 request_addr; /* udp: Who sent the most recent request */ socklen_t request_addr_size; unsigned char *hdrbuf; /* udp packet headers */ int hdrsize; /* number of headers' worth of space is allocated */ bool noreply; /* True if the reply should not be sent. */ /* current stats command */ struct { char *buffer; size_t size; size_t offset; } stats; /* Binary protocol stuff */ /* This is where the binary header goes */ protocol_binary_request_header binary_header; uint64_t cas; /* the cas to return */ short cmd; /* current command being processed */ int opaque; int keylen; conn *next; /* Used for generating a list of conn structures */ LIBEVENT_THREAD *thread; /* Pointer to the thread object serving this connection */ int (*try_read_command)(conn *c); /* pointer for top level input parser */ ssize_t (*read)(conn *c, void *buf, size_t count); ssize_t (*sendmsg)(conn *c, struct msghdr *msg, int flags); ssize_t (*write)(conn *c, void *buf, size_t count); };
图2-1 struct conn的状态转移图
如今结合struct conn定义与状态转移图,对一个简单的客户端命令开始到客户端收到返回消息的过程进行描述,
conn_mwrite状态下,drive_machine会向链接的客户端发送消息(调用transmit函数),若是顺利完成则进入下一次的conn_new_cmd状态。
conn_write状态表示要发送的数据在wbuf中,须要先加入到msglist中,再进入conn_mwrite状态。
若是处理过程当中出现了错误,致使链接发生混乱或者链接中断了,那么进入conn_closing,关闭链接。
memcached中向客户端返回消息使用了库函数sendmsg,所以须要返回给客户端的消息都存储在结构struct conn中的msglist中,其存储结构能够大体表示如图2-2:
图2-2 struct conn结构中msglist的结构
msglist是一个动态扩展的msghdr数组, msgused是数组中已被使用的部分的计数, msgsize是数组长度,msgcurr是目前发送的消息。
iov也是一个动态扩展的iovec数组,iovsize表示数据长度,iovused表示数组中已使用的部分的计数。
buf是存储须要发送的数据的空间,它的起始地址与长度记录在一个iovec结构中,这些buf通常是con中的wbuf, write_and_free,ilist中的item的数据区。
struct conn结构中有一个substate成员对于complete_nread处理binary protocol很重要,complete根据该值决定后续须要作的处理。substate的取值范围定义以下:
enum bin_substates { bin_no_state, bin_reading_set_header, bin_reading_cas_header, bin_read_set_value, bin_reading_get_key, bin_reading_stat, bin_reading_del_header, bin_reading_incr_header, bin_read_flush_exptime, bin_reading_sasl_auth, bin_reading_sasl_auth_data, bin_reading_touch_key, };
这些状态是与binary protocol的命令对应的,如
set命令,substate首先会设置为bin_reading_set_header状态,表示conns[id]在第一次conn_nread状态读取了它须要的额外信息,但它还有data须要读取,须要预分配conns[id].item,后续的conn_nread状态读数据直接读入item中。而后它会设置为bin_read_set_value状态,表示第二次conn_nread读取了data数据并已存入item中,须要将该item放入hashtable与lru链表中。
delete命令,substate会设置为bin_reading_del_header状态,它仅须要一次conn_nread状态读取所须要的额外信息,而后从hashtable与lru链表中删除item,返回删除结果。
struct conn中部分其它成员的做用:
ilist是一个指向item的指针的链表,存储的gets命令须要返回的item的指针,它也是一个动态扩展的数据,由于text协议的gets命令可能请求多个缓存对象。
protocol表明链接使用的协议类型,text或者binary
transport表明UDP或者TCP传输层协议
thread存储这个链接归属的工做线程的指针
suffixlist是一个char指针的动态扩展数组,存储着客户端的flags的字符串形式。
void event_handler(const int fd, const short which, void *arg)
网络事件的回调函数,事实上drive_machine才是核心函数,event_handler简单地调用了drive_machine。
static void drive_machine(conn *c)
根据c->state的值进行不一样的处理。
void dispatch_conn_new(int sfd, enum conn_states init_state, int event_flags, int read_buffer_size, enum network_transport transport, void *ssl)
conn_listening状态的conn在接受到新的链接后,调用该函数构造一个conn结构,并将其分发给工做线程处理。
static enum try_read_result try_read_network(conn *c)
conn_read状态的链接c调用该函数从网络上读取数据。
static int try_read_command_binary(conn *c)
在将数据读到c.rbuf后,调用该函数对binary protocol的命令进行解析,解析结果存储c.binary_head中。
static void dispatch_bin_command(conn *c)
在try_read_command_binary中调用,根据命令执行不一样的操做:命令不须要额外数据了,处理并返回结果;命令仍须要数据,调用bin_read_key设置好c.ritem、c.rlbytes及c.substate,函数返回,进入conn_nread状态。
static void complete_nread_binary(conn *c)
binaray命令须要的extras数据已经放到了缓冲区,根据c->substate决定完成命令并返回或者继续读data部分。
static int try_read_command_ascii(conn *c)
text协议使用的解析命令的函数,实际上调用process_command完成工做。
static void process_command(conn *c, char *command)
根据命令行中的命令关键字进行相应的处理,如get命令调用process_get_command;add/set命令调用process_update_command;incr/decr调用process_arithmetic_command…
static enum transmit_result transmit(conn *c)
将链接的c->msglist中的消息发送回客户端。核心操做为库函数sendmsg。