Memcached的网络模型是基于Libevent网络库开发的,同时Memcached采用多线程的工做方式,工做线程和主线程之间采用pipe进行通讯。Memcached的网络线程模型主要涉及两个主要文件:memcached.c 和thread.c文件。redis
Memcached的网络模型流程大体以下:网络
一、memcached会在main函数中建立主线程的event_base,将监听端口的socket注册到主线程的event_base,由主线程来监听和接受客户端链接。数据结构
二、main函数建立主线程的同时,也会建立N个工做线程,每一个工做线程都拥有各自的event_base 和LIBEVENT_THREAD数据结构来存储线程的信息(线程基本信息、线程队列、pipe文件描述符)。工做线程会将pipe管道的接收端 fd 注册到本身的event_base。多线程
三、当有新链接创建时,主线程会经过accept 函数来与客户端创建新链接,同时将新链接相关的信息填入CQ_ITEM结构并放入工做线程的conn_queue队列,同时向选定的工做线程的管道写入字符,以此触发工做线程的libevent事件。框架
四、主线程是经过求余数的方式来选择线程池中的一个工做线程,工做线程获得通知后,会从conn_queue队列中取出CQ_ITEM,并将fd注册到工做线程的Libevent实例上,从而由工做线程来处理该链接的全部后续事件。socket
总体框架图:tcp
主线程的主要工做就是监听端口和初始化工做线程。下面代码值列出一部分相关内容。memcached
int main (int argc, char **argv) { //这个方法主要用来建立工做线程 memcached_thread_init(settings.num_threads, NULL); errno = 0; if (settings.port && server_sockets(settings.port,tcp_transport,portnumber_file)) { vperror("failed to listen on TCP port %d", settings.port); exit(EX_OSERR); } /* enter the event loop */ //这边开始进行主线程的事件循环 if (event_base_loop(main_base, 0) != 0) { retval = EXIT_FAILURE; } }
memcached会经过memcached_thread_init 方法来建立工做线程。函数
void memcached_thread_init(int nthreads, void *arg) { //......省略部分代码 for (i = 0; i < nthreads; i++) { int fds[2]; //这边会建立pipe,主要用于主线程和工做线程之间的通讯 if (pipe(fds)) { perror("Can't create notify pipe"); exit(1); } // threads是每一个线程都拥有的基本结构:LIBEVENT_THREAD threads[i].notify_receive_fd = fds[0]; threads[i].notify_send_fd = fds[1]; //这个方法很是重要,主要是建立每一个线程本身的libevent的event_base //监听本身的通讯管道接收端,同时初始化工做队列 setup_thread(&threads[i]); /* Reserve three fds for the libevent base, and two for the pipe */ stats_state.reserved_fds += 5; } /* Create threads after we've done all the libevent setup. */ //这里是循环建立线程 //线程建立的回调函数是worker_libevent for (i = 0; i < nthreads; i++) { create_worker(worker_libevent, &threads[i]); } /* Wait for all the threads to set themselves up before returning. */ pthread_mutex_lock(&init_lock); wait_for_thread_registration(nthreads); pthread_mutex_unlock(&init_lock); }
setup_thread 方法建立线程本身的event_base,工做线程在初始化时会将pipe的写事件注册到event_base,其写事件回调函数为 thread_libevent_process。当主线程接受到客户端链接时,向工做线程的pipe写字符,就会触发工做线程的thread_libevent_process 回调函数。oop
static void setup_thread(LIBEVENT_THREAD *me) { //.......省略部分代码 //每一个独立的线程都应该有本身独立的event_base me->base = event_init(); if (! me->base) { fprintf(stderr, "Can't allocate event base\n"); exit(1); } /* Listen for notifications from other threads */ //这边很是重要,这边主要建立pipe的读事件EV_READ的监听 //当pipe中有写入事件的时候,libevent就会回调thread_libevent_process方法 event_set(&me->notify_event, me->notify_receive_fd, EV_READ | EV_PERSIST, thread_libevent_process, me); event_base_set(me->base, &me->notify_event); //添加事件操做 if (event_add(&me->notify_event, 0) == -1) { fprintf(stderr, "Can't monitor libevent notify pipe\n"); exit(1); } //初始化一个工做队列 me->new_conn_queue = malloc(sizeof(struct conn_queue)); if (me->new_conn_queue == NULL) { perror("Failed to allocate memory for connection queue"); exit(EXIT_FAILURE); } cq_init(me->new_conn_queue); //初始化线程锁 if (pthread_mutex_init(&me->stats.mutex, NULL) != 0) { perror("Failed to initialize mutex"); exit(EXIT_FAILURE); } me->suffix_cache = cache_create("suffix", SUFFIX_SIZE, sizeof(char*), NULL, NULL); if (me->suffix_cache == NULL) { fprintf(stderr, "Failed to create suffix cache\n"); exit(EXIT_FAILURE); } }
以上是工做线程建立时初始化event_base的部分,真正建立线程的方法是 memcached_thread_init 中的create_work方法。
//真正建立工做线程 static void create_worker(void *(*func)(void *), void *arg) { pthread_attr_t attr; int ret; pthread_attr_init(&attr); if ((ret = pthread_create(&((LIBEVENT_THREAD*)arg)->thread_id, &attr, func, arg)) != 0) { fprintf(stderr, "Can't create thread: %s\n", strerror(ret)); exit(1); } }
create_worker方法在建立线程时,指定了线程的运行函数为worker_libevent,工做线程的运行函数其实就是进入事件循环,等待监听的事件触发。
//工做线程运行函数 static void *worker_libevent(void *arg) { //......省略部分代码 register_thread_initialized(); //这个方法主要是开启事件的循环 //每一个线程中都会有本身独立的event_base和事件的循环机制 //memcache的每一个工做线程都会独立处理本身接管的链接 event_base_loop(me->base, 0); //销毁event_base event_base_free(me->base); return NULL; }
到目前为止,每一个工做线程的初始化工做已经完成,每一个工做线程只监听了pipe的写事件,其回调函数为thread_libevent_process。
//管道有数据写入时的回调函数 static void thread_libevent_process(int fd, short which, void *arg) { LIBEVENT_THREAD *me = arg; CQ_ITEM *item; char buf[1]; conn *c; unsigned int timeout_fd; //回调函数中回去读取pipe中的信息 //主线程中若是有新的链接,会向其中一个线程的pipe中写入1 //这边读取pipe中的数据,若是为1,则说明从pipe中获取的数据是正确的 if (read(fd, buf, 1) != 1) { if (settings.verbose > 0) fprintf(stderr, "Can't read from libevent pipe\n"); return; } switch (buf[0]) { case 'c': //从工做线程的队列中获取一个CQ_ITEM链接信息 item = cq_pop(me->new_conn_queue); //若是item不为空,则须要进行链接的接管 if (NULL == item) { break; } switch (item->mode) { case queue_new_conn: //conn_new这个方法很是重要,主要是建立socket的读写等监听事件。 //init_state 为初始化的类型,主要在drive_machine中经过这个状态类判断处理类型 c = conn_new(item->sfd, item->init_state, item->event_flags, item->read_buffer_size, item->transport, me->base); if (c == NULL) { if (IS_UDP(item->transport)) { fprintf(stderr, "Can't listen for events on UDP socket\n"); exit(1); } else { if (settings.verbose > 0) { fprintf(stderr, "Can't listen for events on fd %d\n", item->sfd); } close(item->sfd); } } else { c->thread = me; } break; case queue_redispatch: conn_worker_readd(item->c); break; } cqi_free(item); break; /* we were told to pause and report in */ case 'p': register_thread_initialized(); break; /* a client socket timed out */ case 't': if (read(fd, &timeout_fd, sizeof(timeout_fd)) != sizeof(timeout_fd)) { if (settings.verbose > 0) fprintf(stderr, "Can't read timeout fd from libevent pipe\n"); return; } conn_close_idle(conns[timeout_fd]); break; } }
在新链接到来时,会调用conn_new 函数,监听新链接的读写事件。而且读写事件的回调函数为event_handler,event_handler方法的核心是 drive_machine,在这个函数中,memcached会根据链接的不一样状态来进行不一样的操做。
//主线程主要是监听用户的socket链接事件;工做线程主要监听socket的读写事件 //当用户socket的链接有数据传递过来的时候,就会调用event_handler这个回调函数 conn *conn_new(){ //......省略部分代码 event_set(&c->event, sfd, event_flags, event_handler, (void *)c); event_base_set(base, &c->event); }
static void drive_machine(conn *c) { //......省略部分代码 while (!stop) { //这边经过state来处理不一样类型的事件 switch(c->state) { //这边主要处理tcp链接,只有在主线程的下,才会执行listening监听操做 //监听状态 case conn_listening: //...... //等待状态,等待客户端的数据报文到来 case conn_waiting: //...... //读取事件 //例若有用户提交数据过来的时候,工做线程监听到事件后,最终会调用这块代码 //读取数据的事件,当客户端有数据报文上传的时候,就会触发libevent的读事件 case conn_read: //...... } } return; }
drive_machine方法也是主线程event_base 回调函数的核心,主线程的socket是经过main函数中server_sockets方法建立的,而server_sockets中主要调用了server_socket这个方法,咱们能够看下server_socket这个方法:
static int server_socket(const char *interface, int port, enum network_transport transport,FILE *portnumber_file) { //建立一个新的事件 //咱们发现上面的工做线程也是调用这个方法,可是区别是这个方法指定了state的类型为:conn_listening //注意这边有一个conn_listening,这个参数主要是指定调用drive_machine这个方法中的conn_listen代码块。 if (!(listen_conn_add = conn_new(sfd, conn_listening,EV_READ | EV_PERSIST, 1,transport, main_base))) { fprintf(stderr, "failed to create listening connection\n"); exit(EXIT_FAILURE); } listen_conn_add->next = listen_conn; listen_conn = listen_conn_add; }
conn_new 方法已经介绍过了,该方法最终会进入drive_machine方法,而且链接状态为 conn_listening。memcached在 conn_listening的状态时,会调用dispath_conn_new来将新链接的相关信息push到工做线程的队列中。
case conn_listening: addrlen = sizeof(addr); sfd = accept(c->sfd, (struct sockaddr *)&addr, &addrlen); if (sfd == -1) { if (use_accept4 && errno == ENOSYS) { use_accept4 = 0; continue; } perror(use_accept4 ? "accept4()" : "accept()"); if (errno == EAGAIN || errno == EWOULDBLOCK) { /* these are transient, so don't log anything */ stop = true; } else if (errno == EMFILE) { if (settings.verbose > 0) fprintf(stderr, "Too many open connections\n"); accept_new_conns(false); stop = true; } else { perror("accept()"); stop = true; } break; } if (!use_accept4) { if (fcntl(sfd, F_SETFL, fcntl(sfd, F_GETFL) | O_NONBLOCK) < 0) { perror("setting O_NONBLOCK"); close(sfd); break; } } if (settings.maxconns_fast && stats_state.curr_conns + stats_state.reserved_fds >= settings.maxconns - 1) { str = "ERROR Too many open connections\r\n"; res = write(sfd, str, strlen(str)); close(sfd); STATS_LOCK(); stats.rejected_conns++; STATS_UNLOCK(); } else { //若是客户端用socket链接上来,则会调用这个分发逻辑的函数 //这个函数会将链接信息分发到某一个工做线程中,而后工做线程接管具体的读写操做 dispatch_conn_new(sfd, conn_new_cmd, EV_READ | EV_PERSIST, DATA_BUFFER_SIZE, c->transport); } stop = true; break;
dispath_conn_new 方法其实就是申请CQ_ITEM结构来保存链接信息,并将该结构PUSH到选定线程的队列中,同时向该线程的pipe写入字符,触发该工做线程的libevent网络时间,从源码也能够发现,memcached选择工做线程的方式是经过取余数来实现的。
void dispatch_conn_new(int sfd, enum conn_states init_state, int event_flags, int read_buffer_size, enum network_transport transport) { //每一个链接连上来的时候,都会申请一块CQ_ITEM的内存块,用于存储链接的基本信息 CQ_ITEM *item = cqi_new(); char buf[1]; //若是item建立失败,则关闭链接 if (item == NULL) { close(sfd); /* given that malloc failed this may also fail, but let's try */ fprintf(stderr, "Failed to allocate memory for connection object\n"); return ; } //这个方法很是重要。主要是经过求余数的方法来获得当前的链接须要哪一个线程来接管 //并且last_thread会记录每次最后一次使用的工做线程,每次记录以后就可让工做线程进入一个轮询,保证了每一个工做线程处理的链接数的平衡 int tid = (last_thread + 1) % settings.num_threads; //获取线程的基本结构 LIBEVENT_THREAD *thread = threads + tid; last_thread = tid; item->sfd = sfd; item->init_state = init_state; item->event_flags = event_flags; item->read_buffer_size = read_buffer_size; item->transport = transport; item->mode = queue_new_conn; //向工做线程的队列中放入CQ_ITEM cq_push(thread->new_conn_queue, item); MEMCACHED_CONN_DISPATCH(sfd, thread->thread_id); buf[0] = 'c'; //向工做线程的pipe中写入1 //工做线程监听到pipe中有写入数据,工做线程接收到通知后,就会向thread->new_conn_queue队列中pop出一个item,而后进行链接的接管操做 if (write(thread->notify_send_fd, buf, 1) != 1) { perror("Writing to thread notify pipe"); } }
如下是储存链接信息的CQ_ITEM结构以及每一个线程的处理队列结构。处理队列结构其实是链表实现的。
//储存链接信息的CQ_ITEM结构 typedef struct conn_queue_item CQ_ITEM; struct conn_queue_item { int sfd; //socket的fd enum conn_states init_state; //事件类型 int event_flags; //libevent的flags int read_buffer_size; //读取的buffer的size enum network_transport transport; CQ_ITEM *next; //下一个item的地址 }; //每一个线程的处理队列结构。 typedef struct conn_queue CQ; struct conn_queue { CQ_ITEM *head; CQ_ITEM *tail; pthread_mutex_t lock; }