前面分析了worker线程的初始化,以及主线程建立socket并监听的过程。本节会分析链接如何创建与分发。数组
A,能够摸清楚master线程的大体逻辑:网络
1)初始化各个worker线程socket
2)执行socket,bind,listen...主线程进行监听tcp
3)一旦有新的链接创建,则调用event_handlermemcached
B,woker线程被建立以后的逻辑:函数
1)监听管道recv端的fd,一旦有数据过来,则调用thread_libevent_processoop
注意,worker线程其实也是利用event_base_loop将本身进行阻塞。主线程阻塞在监听的fd上,而worker线程则阻塞在监听管道recv端的fd上。回忆一下前文,memcached_thread_init函数中创建了管道,用于master线程和worker线程间的通讯。只有master线程接受了新的请求以后,才会利用管道告知worker线程,而worker线程只有等管道有数据传输来的时候,才会被唤醒。this
用图展示初始状态下的master和worker线程:spa
图中假设主线程socket函数返回的fd是26。能够看到master线程以及4条worker线程,都由于没有任何事件而处于阻塞状态。线程
另外,前文提到一个很重要的结构体conn,conns数组由conn指针组成。memcahed的每一个链接都对应有一个conn实例,能够根据fd在conns数组里找到。因为master线程的套接口是26,因此conns[26]的指向的conn就表示master线程正监听的socket以及一些附加信息。
咱们来模拟一下有链接过来时,memcached内部的执行。前文提到,一旦有链接过来,则master线程会被触发执行event_handler函数。
void event_handler(const int fd, const short which, void *arg) { conn *c; c = (conn *)arg; assert(c != NULL); c->which = which; /* sanity */ if (fd != c->sfd) { if (settings.verbose > 0) fprintf(stderr, "Catastrophic: event fd doesn't match conn fd!\n"); conn_close(c); return; } // 主要逻辑所有封装在drive_machine中 drive_machine(c); /* wait for next event */ return; }
event_handler中传入的conn,就是conns[26]。
在event_handler里会继续将这个conn传递给drive_machine。
static void drive_machine(conn *c) { bool stop = false; int sfd; socklen_t addrlen; struct sockaddr_storage addr; int nreqs = settings.reqs_per_event; int res; const char *str; #ifdef HAVE_ACCEPT4 static int use_accept4 = 1; #else static int use_accept4 = 0; #endif assert(c != NULL); // 一个大的while循环,维护了一个状态机,根据conn的当前状态作出处理,跳到下一状态 while (!stop) { switch(c->state) { // 初始状态为conn_listening case conn_listening: addrlen = sizeof(addr); // accept链接,会产生一个新的fd,该链接以后的读写均经过新fd完成 #ifdef HAVE_ACCEPT4 if (use_accept4) { sfd = accept4(c->sfd, (struct sockaddr *)&addr, &addrlen, SOCK_NONBLOCK); } else { sfd = accept(c->sfd, (struct sockaddr *)&addr, &addrlen); } #else sfd = accept(c->sfd, (struct sockaddr *)&addr, &addrlen); #endif // accept失败 if (sfd == -1) { // 若是是accept4未被实现,则换用accept继续尝试接受链接 if (use_accept4 && errno == ENOSYS) { use_accept4 = 0; continue; } perror(use_accept4 ? "accept4()" : "accept()"); // 若是链接队列已经没有未处理的链接,则终止循环 if (errno == EAGAIN || errno == EWOULDBLOCK) { /* these are transient, so don't log anything */ stop = true; } // 链接打满,accept_new_conns(false)会终止event继续触发 else if (errno == EMFILE) { if (settings.verbose > 0) fprintf(stderr, "Too many open connections\n"); accept_new_conns(false); stop = true; } else { perror("accept()"); stop = true; } break; } // 设置新的套接字为非阻塞 if (!use_accept4) { if (fcntl(sfd, F_SETFL, fcntl(sfd, F_GETFL) | O_NONBLOCK) < 0) { perror("setting O_NONBLOCK"); close(sfd); break; } } if (settings.maxconns_fast && stats_state.curr_conns + stats_state.reserved_fds >= settings.maxconns - 1) { str = "ERROR Too many open connections\r\n"; res = write(sfd, str, strlen(str)); close(sfd); STATS_LOCK(); stats.rejected_conns++; STATS_UNLOCK(); } else { // !!!分发并通知worker线程有一个新的链接 dispatch_conn_new(sfd, conn_new_cmd, EV_READ | EV_PERSIST, DATA_BUFFER_SIZE, c->transport); } stop = true; break; case conn_waiting: ... case conn_read: ... case conn_parse_cmd : ... case conn_new_cmd: ... case conn_nread: ... case conn_swallow: ... case conn_write: ... case conn_mwrite: ... case conn_closing: ... case conn_closed: ... case conn_watch: ... case conn_max_state: ... } } return; }
前文曾提到drive_machine是一个大的状态机。上面的代码只保留了对conn_listening的处理,由于master线程接受新链接时,就是这个状态。
代码里调用accept4或者accept函数产生新的fd,为27。accept以后,主要就是调用dispatch_conn_new来对链接作分发,而且通知worker线程。
咱们来看dispatch_conn_new的实现:
void dispatch_conn_new(int sfd, enum conn_states init_state, int event_flags, int read_buffer_size, enum network_transport transport) { // CQ_ITEM用于封装新链接的一些信息 CQ_ITEM *item = cqi_new(); char buf[1]; if (item == NULL) { close(sfd); /* given that malloc failed this may also fail, but let's try */ fprintf(stderr, "Failed to allocate memory for connection object\n"); return ; } // 挑选worker线程,采用轮循机制 int tid = (last_thread + 1) % settings.num_threads; LIBEVENT_THREAD *thread = threads + tid; last_thread = tid; // 设置item item->sfd = sfd; item->init_state = init_state; item->event_flags = event_flags; item->read_buffer_size = read_buffer_size; item->transport = transport; // 将item放入线程的new_conn_queue队列 cq_push(thread->new_conn_queue, item); MEMCACHED_CONN_DISPATCH(sfd, thread->thread_id); // 经过管道,写入一字节的c,用来达到通知子线程的目的 buf[0] = 'c'; if (write(thread->notify_send_fd, buf, 1) != 1) { perror("Writing to thread notify pipe"); } }
能够很明显的看到,worker线程的分发是采用轮循机制的,每次选出来的都是threads数组中的下一个。
CQ_ITEM结构体用于封装新accept的链接的一些相关信息,每一个线程内部都维护着一个CQ_ITEM队列。当主线程经过管道写入字符c以后,子线程会被通知到,有一个新的链接来了。因而,子线程随后当即从CQ_ITEM队列中取出CQ_ITEM,并对这个新的链接设置监听事件等等。子线程具体的实现,后面会分析到,这里先看下CQ_ITEM:
typedef struct conn_queue_item CQ_ITEM; struct conn_queue_item { int sfd; // accept产生的新fd enum conn_states init_state; // 子线程拿到新的链接以后,链接对应的状态 int event_flags; // 子线程对新链接设置的事件 int read_buffer_size; // 通常2M enum network_transport transport; // tcp or udp conn *c; CQ_ITEM *next; };
好,至此咱们已经看完了主线程所作的工做。
只要不断的有新链接进来,主线程就会不断调用event_handler,并在drive_machine状态机中accept & dispatch链接。至于接下来,接收客户端发来的命令并作出响应等等,都是在worker线程里完成。
这张图画出了子线程中的CQ_ITEM队列,以及主线程经过管道告知子线程。
注意worker1线的链接,fd分别是27,31,35...由于中间其余fd对应的链接会分别被worker2,worker3,worker4处理。
另外借用一张其余博客的图,也挺清楚的:
本小节开始看worker线程获取通知以后,所作的一些处理。前文说到,marster线程会向管道写入一个字符'c',用来告知worker线程有新的链接了。因而worker线程监听管道的事件被触发,worker线程会进入thread_libevent_process函数:
static void thread_libevent_process(int fd, short which, void *arg) { LIBEVENT_THREAD *me = arg; CQ_ITEM *item; char buf[1]; unsigned int timeout_fd; // 从管道里读取一个字节 if (read(fd, buf, 1) != 1) { if (settings.verbose > 0) fprintf(stderr, "Can't read from libevent pipe\n"); return; } switch (buf[0]) { // 字符c case 'c': // item是从new_conn_queue队列中取出的一个CQ_ITEM对象 item = cq_pop(me->new_conn_queue); if (NULL != item) { // 调用conn_new来建立conn,而且设置监听事件 conn *c = conn_new(item->sfd, item->init_state, item->event_flags, item->read_buffer_size, item->transport, me->base); if (c == NULL) { if (IS_UDP(item->transport)) { fprintf(stderr, "Can't listen for events on UDP socket\n"); exit(1); } else { if (settings.verbose > 0) { fprintf(stderr, "Can't listen for events on fd %d\n", item->sfd); } close(item->sfd); } } else { c->thread = me; } // 释放item cqi_free(item); } break; case 'r': ... case 'p': ... case 't': ... } }
注意conn_new函数,conn_new在前文出现过(server_socket函数中用conn_new建立了conns[26])。本例中,conn_new执行完以后,会新建立一个conn对象,而且将conns[27]指向它,今后之后,线程worker1就利用该conn来与客户端交互。
值得一提的是,新的conn中,state被置为conn_new_cmd,代表该链接已经创建,等待接受client发送的命令。而主线程所使用的conns[26],state永远为conn_listening,代表主线程一直在等待新的链接。
在conn_new中,设置事件的语句为:
event_set(&c->event, sfd, event_flags, event_handler, (void *)c); event_base_set(base, &c->event);
可见,子线程的事件被触发以后,也是调用event_handler函数,和主线程的事件触发的函数同样。
回忆下event_handler中的状态机,worker线程的conn初始状态为conn_new_cmd,因此一旦worker线程接受到client的命令,便会进入drive_machine中的case conn_new_cmd分支。固然这涉及到后续具体的命令处理,已经不在本文的探讨范畴了。
最后来看一下worker线程thread_libevent_process处理完毕以后的状态: