接上一篇redis网络通讯模块,memcached的网络通讯借助了libevent库,简化了底层的网络读写事件的封装,可是memcached多了多线程处理html
核心流程:客户端 -> 服务端memcached master -> master fd -> master accept(fd) -> client fd -> worker线程处理client fd全部读写事件git
master处理流程在main()函数中,虽然main函数共有6892-8334共计1442行代码,6892->8104共计1212行都是在解析memcached的配置、和作一些主流程无关的小操做。github
main中的核心代码不到230行(8104->8334),仍是很是容易分析的redis
master作的核心工做有一下几点数组
这个是libevent事件循环的核心共享结构体,后面的事件注册,启动等都须要这个结构体安全
这里须要注意的是memcached使用了带参的event_base_new()函数,参数是libevent配置服务器
ev_config = event_config_new();
event_config_set_flag(ev_config, EVENT_BASE_FLAG_NOLOCK);
main_base = event_base_new_with_config(ev_config);
复制代码
不为event_base分配锁,这样能够提升一些libevent的性能,虽然这样可能致使多线程下使用event_base变得不安全,可是memcached的event_base是一个线程一个event_base,因此不存在这个问题网络
memcached.c::server_socket()函数中使用socket()和bind()方法启动了tcp服务器,并获得这个表明memcached server的fd多线程
核心代码以下:socket
static int server_socket(const char *interface, int port, enum network_transport transport, FILE *portnumber_file, bool ssl_enabled) {
// ...
for (next= ai; next; next= next->ai_next) {
conn *listen_conn_add;
if ((sfd = new_socket(next)) == -1) {
// ...
continue;
}
// 省略ipv6逻辑...
if (bind(sfd, next->ai_addr, next->ai_addrlen) == -1) {
// ...
close(sfd);
continue;
} else {
success++;
if (!IS_UDP(transport) && listen(sfd, settings.backlog) == -1) {
// ...
}
// ...
}
// 省略udp逻辑...
if (!(listen_conn_add = conn_new(sfd, conn_listening,
EV_READ | EV_PERSIST, 1,
transport, main_base, NULL))) {
// ...
}
// ...
}
复制代码
虽然memcached的main()函数一开始就建立了master的libevent事件循环,可是使用是很是靠后的,在worker线程都初始化完成后,才建立tcp服务器,并为master的event_base注册回调函数
master的event_handler回调函数以下:
void event_handler(const int fd, const short which, void *arg) {
conn *c;
c = (conn *)arg;
assert(c != NULL);
c->which = which;
/* sanity */
if (fd != c->sfd) {
if (settings.verbose > 0)
fprintf(stderr, "Catastrophic: event fd doesn't match conn fd!\n");
conn_close(c);
return;
}
drive_machine(c);
/* wait for next event */
return;
}
复制代码
能够看到这个event_handler里面只是作了一些错误检查,而后就当即交给drive_machine函数进行处理了
这个drive_machine主要是对c->state
变量进行逻辑判断处理,对于master的c变量的state属性,它的值在memcached.c:653被设置了,值是memcached.c::6242行传递进去的,为:conn_listening,drive_machine对应的逻辑以下
addrlen = sizeof(addr);
#ifdef HAVE_ACCEPT4
if (use_accept4) {
sfd = accept4(c->sfd, (struct sockaddr *)&addr, &addrlen, SOCK_NONBLOCK);
} else {
sfd = accept(c->sfd, (struct sockaddr *)&addr, &addrlen);
}
#else
sfd = accept(c->sfd, (struct sockaddr *)&addr, &addrlen);
#endif
// 省略流程无关代码...
#ifdef TLS
// ssl相关逻辑省略...
#endif
dispatch_conn_new(sfd, conn_new_cmd, EV_READ | EV_PERSIST,
DATA_BUFFER_SIZE, c->transport, ssl_v);
}
stop = true;
复制代码
能够看到master线程的tcp服务器有新客户端链接进入后,drive_machine处理逻辑会当即获取这个memcached客户端链接fd为sfd,并交给了dispatch_conn_new函数进行处理
dispatch_conn_new函数核心逻辑以下
void dispatch_conn_new(int sfd, enum conn_states init_state, int event_flags, int read_buffer_size, enum network_transport transport, void *ssl) {
CQ_ITEM *item = cqi_new();
char buf[1];
if (item == NULL) {
close(sfd);
/* given that malloc failed this may also fail, but let's try */
fprintf(stderr, "Failed to allocate memory for connection object\n");
return ;
}
int tid = (last_thread + 1) % settings.num_threads;
LIBEVENT_THREAD *thread = threads + tid;
last_thread = tid;
item->sfd = sfd;
item->init_state = init_state;
item->event_flags = event_flags;
item->read_buffer_size = read_buffer_size;
item->transport = transport;
item->mode = queue_new_conn;
item->ssl = ssl;
cq_push(thread->new_conn_queue, item);
MEMCACHED_CONN_DISPATCH(sfd, thread->thread_id);
buf[0] = 'c';
if (write(thread->notify_send_fd, buf, 1) != 1) {
perror("Writing to thread notify pipe");
}
}
复制代码
这里面主要是
后续worker线程的event_base监听到notify_receive_fd的可读事件就开始工做了, 由于这里使用pipe线程通讯技术,因此notify_send_fd写数据,触发notify_receive_fd的可读事件,pipe线程通讯技术参考:blog.csdn.net/robertkun/a…
至此,memcached master线程,网络相关的主流程就走完了,剩下的事情就交给worker进程处理了
当客户端向memcached服务器写入输入时,可使用clion进行断点测试,好比在event_handler上进行断点
memcached.c::conn_new()
注册事件回调
event_set(&c->event, sfd, event_flags, event_handler, (void *)c);
event_base_set(base, &c->event);
复制代码
event_handler
就是设置的回调函数
worker线程的回调在master里就指定好了,参考:thread.c::setup_thread(),worker的回调函数被指定为了thread_libevent_process
,结合master的处理流程,这个worker的回调函数只有在master获得新链接时被触发
参考thread.c::thread_libevent_process()函数,核心代码以下
item = cq_pop(me->new_conn_queue);
c = conn_new(item->sfd, item->init_state, item->event_flags,
item->read_buffer_size, item->transport,
me->base, item->ssl);
if (c == NULL) {
// 省略错误处理...
} else {
c->thread = me;
#ifdef TLS
// 省略ssl逻辑...
#endif
}
复制代码
worker调用cq_pop从队列中获取客户端链接后,而后就交给了conn_new函数进行处理
conn_new函数把客户端的读写事件交给了event_handler函数进行处理,核心代码以下
event_set(&c->event, sfd, event_flags, event_handler, (void *)c);
复制代码
master也使用event_handler函数来处理master的事件,worker也使用event_handler处理事件,是不会冲突的,由于在event_handler里的drive_machine执行体中,会经过这个事件参数conn结构体的state属性的进行分支判断, worker的conn的state从CQ_ITEM获取,值为conn_new_cmd
客户端往memcached服务器发送数据,触发event_handler函数执行,此时state值为conn_new_cmd 对应drive_machine核心代码以下
while (!stop) {
case conn_new_cmd:
/* Only process nreqs at a time to avoid starving other connections */
--nreqs;
if (nreqs >= 0) {
reset_cmd_handler(c);
} else {
// 异常处理逻辑...
}
break;
}
复制代码
最上层是个无限循环,里面确定有个出口,把请求交给了reset_cmd_handler()函数进行处理,核心代码以下
static void reset_cmd_handler(conn *c) {
// ...
if (c->rbytes > 0) {
conn_set_state(c, conn_parse_cmd);
} else {
conn_set_state(c, conn_waiting);
}
}
复制代码
若是还有数据能够读取,就设置state为conn_parse_cmd状态,不然,进入conn_waiting状态
case conn_parse_cmd :
if (c->try_read_command(c) == 0) {
/* wee need more data! */
conn_set_state(c, conn_waiting);
}
break;
复制代码
进行数据读取,而后将状态设置为conn_waiting, 这个try_read_command里面就对客户端的输入进行了处理,并把结果返回了客户端,参考: try_read_command_ascii
static int try_read_command_ascii(conn *c) {
char *el, *cont;
if (c->rbytes == 0)
return 0;
el = memchr(c->rcurr, '\n', c->rbytes);
if (!el) {
// 略过异常处理逻辑...
return 0;
}
cont = el + 1;
if ((el - c->rcurr) > 1 && *(el - 1) == '\r') {
el--;
}
*el = '\0';
assert(cont <= (c->rcurr + c->rbytes));
c->last_cmd_time = current_time;
process_command(c, c->rcurr);
c->rbytes -= (cont - c->rcurr);
c->rcurr = cont;
assert(c->rcurr <= (c->rbuf + c->rsize));
return 1;
}
复制代码
调用process_command进行请求处理了,至此,memcached的网络处理到这里就结束了,参考memcached.c::process_command()
本文基于memcached master分支写成,参考:github.com/memcached/m…
memcached第一版没有多线程,多线程的worker处理是在后续加上的,参考github上的1.2.0第一版:github.com/memcached/m…
pthread_create函数用来建立线程
memcached使用的是多线程模型,而不是多进程模型,这一点很是重要,这意味着memcached的master线程和worker线程能够共享一些结构体变量,好比表明worker线程的LIBEVENT_THREAD结构体,源码以下
typedef struct {
pthread_t thread_id; /* unique ID of this thread */
struct event_base *base; /* libevent handle this thread uses */
struct event notify_event; /* listen event for notify pipe */
int notify_receive_fd; /* receiving end of notify pipe */
int notify_send_fd; /* sending end of notify pipe */
struct thread_stats stats; /* Stats generated by this thread */
struct conn_queue *new_conn_queue; /* queue of new connections to handle */
cache_t *suffix_cache; /* suffix cache */
#ifdef EXTSTORE
cache_t *io_cache; /* IO objects */
void *storage; /* data object for storage system */
#endif
logger *l; /* logger buffer */
void *lru_bump_buf; /* async LRU bump buffer */
#ifdef TLS
char *ssl_wbuf;
#endif
} LIBEVENT_THREAD;
复制代码
memcached就是master经过往共享的notify_send_fd字段上写入数据,触发worker线程处理新的客户端链接fd的
thread.c::dispatch_conn_new() -> cq_push(thread->new_conn_queue, item) -> write(thread->notify_send_fd, buf, 1)
write(thread->notify_send_fd, buf, 1)
就是触发worker线程的event_base,notify_send_fd的可读事件
enum conn_states {
conn_listening, /**< the socket which listens for connections */
conn_new_cmd, /**< Prepare connection for next command */
conn_waiting, /**< waiting for a readable socket */
conn_read, /**< reading in a command line */
conn_parse_cmd, /**< try to parse a command from the input buffer */
conn_write, /**< writing out a simple response */
conn_nread, /**< reading in a fixed number of bytes */
conn_swallow, /**< swallowing unnecessary bytes w/o storing */
conn_closing, /**< closing this connection */
conn_mwrite, /**< writing out many items sequentially */
conn_closed, /**< connection is closed */
conn_watch, /**< held by the logger thread as a watcher */
conn_max_state /**< Max state value (used for assertion) */
};
复制代码
CQ_ITEM的state属性的值在memcached.c::5725被指定成了conn_new_cmd
master创建的tcp服务器,接收的memcached客户端链接均是fd,注意区分fd表明的角色