转 http://www.cnblogs.com/Seapeak/archive/2010/04/08/1707807.html html
在linux平台上使用c开发网络程序的同志们通常状况下都对鼎鼎大名的libevent很是的熟悉了。可是一些新进入此领域的new new people们对此都是一头雾水。本来的迷茫再加上开源软件一向的“帮助文件”缺失做风,让咱们这些新手们显的很是的无助。幸亏有一些热心的朋友们帮忙,才化险为夷啊!linux
前几天一直在开发一个locker server,虽然公司现有的locker server能很好的运转,可是毕竟是net的,通用性不广,当咱们要在linux上开发多集群系统的时候现有的locker server就未免显得有点捉襟见肘了。正是在开发locker server的过程当中使用到了libevent。apache
整体上,libevent是很好用的。一两个函数就能搞定你复杂的网络通信工做了。固然了,这句话得用在你使用的是“单线程”的状况下。虽然在linux系统中,进程的资源和window系统中进程的资源相比轻量级不少,代价也至关的没有那么昂贵,因此不少的软件都是使用“多进程”方式实现的,好比大名鼎鼎的apache。可是在咱们的系统中,咱们使用了“单进程多线程”的方式,这样,咱们就能在单机上启动多个进程,以达到“伪分布式”的效果来达到测试的目的。安全
那么这个时候就要注意libevent的使用了,由于对于event_base来讲,不是线程安全的。也就是说多线程不能share同一个event_base,就算是加锁操做也不行。那么这个时候就只能采起“单线程单event_base”的策略了。个人作法是作一个task pool(任务对象池),每一个任务会被一个thread执行,固然了,thread确定也是从thread pool拿出来的,而在task pool初始化的时候,我就给每一个task中的event_base初始化了对象,这样,万事大吉了。网络
这个地方注意了之后,就开始说网络通信了。在使用libevent的时候,触发事件是在接收到网络链接(或者timeout事件超时)的时候。因此你须要在事件处理函数中判断时间源,其次libevent接收网络通信的字节流时是使用了libevnet中自带的缓冲的,因此当你接收的时候必定要注意累加,而且屡次loop或者注册 event_event中的事件。因此在个人task中,会有接收的data。固然了若是你的协议是分为header和body的,一般header比较短,body比较长,并且在client,header和body一般是连续发送的,这样,在使用libevent的时候,header和body是同时被接收到的,这点必定要注意,因此提醒你在接收数据的函数中,须要区分接收header部分仍是body部分;当body很是长,超过libevent的缓冲时,是须要屡次屡次触发接收函数的,这点也要注意,就是让你须要在接收的时候除了区分header和body之外,还要注意一次接收不彻底的状况下,对于数据须要累加。多线程
当你在使用libevent时,event_set事件时,只要不是使用EV_PERSIST注册的事件是不须要在接收完一次数据后屡次event_add的,只有当你不使用EV_PERSIST时,你的事件才须要屡次event_add到event_base中;固然了,使用了EV_PERSIST注册的函数在event_base被task pool回收时是要显式的event_del该注册事件的,没有使用EV_PERSIST注册的事件是不须要显式的使用event_del删除该事件的。less
static void read_buffer(int client_socket_fd,short event_type,void *arg) { if(NULL == arg) { log_error("File:"__FILE__",Line:%d.event base arg is NULL.",__LINE__); return; } task_info_t *task_info = (task_info_t *) arg; if(event_type == EV_TIMEOUT) /* 这个地方注意须要判断是否超时 由于我event_add事件的时候没有使用ev_persist 因此当超时时须要再add一次事件到event_base的loop中 */ { if(0 != event_add(&task_info->on_read,&task_info->timeout)) { log_error("File:"__FILE__",Line:%d.repeart add read header event to event_base is error."); close(task_info->on_read.ev_fd); task_pool_push(task_info); } return; } int bytes; /* 这个地方就是开始接收头部 接收头部时,可能分为好几回从缓冲中取得,因此须要一个while累加 */ while(header == task_info->read_type)//recv header { bytes = recv(client_socket_fd,task_info->header_buffer+task_info->offset,REQUEST_LENGTH -task_info->offset,0); if(0 > bytes ) { if (errno == EAGAIN || errno == EWOULDBLOCK) { if(0 != event_add(&task_info->on_read, &task_info->timeout)) { close(task_info->on_read.ev_fd); task_pool_push(task_info); log_error("File: "__FILE__", line: %d, "\ "event_add fail.", __LINE__); return; } } else { log_error("File: "__FILE__", line: %d,recv failed,errno: %d, error info: %s", __LINE__, errno, strerror(errno)); close(task_info->on_read.ev_fd); task_pool_push(task_info); } return; } else if(0 == bytes) { log_warning("File:"__FILE__",Line:%d.recv buffer form network is error.disconnection the network.", __LINE__); close(task_info->on_read.ev_fd); task_pool_push(task_info); return; } if(REQUEST_LENGTH > bytes+task_info->offset) { log_warning("File:"__FILE__",Line:%d.recv header is not over.",__LINE__); task_info->offset += bytes; if(0 != event_add(&task_info->on_read, &task_info->timeout)) { close(task_info->on_read.ev_fd); task_pool_push(task_info); log_error("File: "__FILE__", line: %d, "\ "event_add fail.", __LINE__); return; } } else { task_info->read_type = body; deal_request_header(task_info); task_info->body_buffer = (char *) malloc(task_info->request_info.length); if(NULL == task_info->body_buffer) { log_error("File:"__FILE__",Line:%d.alloc mem to task_info data is error.",__LINE__); close(client_socket_fd); task_pool_push(task_info); return; } memset(task_info->body_buffer,0,task_info->request_info.length); task_info->offset = 0;//set recv body buffer offset to 0 break; } } /* 这个地方就是开始接收body, 和header同样,也要考虑body屡次接收累加的状况。 */ while(body == task_info->read_type) { bytes = recv(client_socket_fd,task_info->body_buffer+task_info->offset,task_info->request_info.length-task_info->offset,0); if(0 > bytes ) { if (errno == EAGAIN || errno == EWOULDBLOCK) { if(0 != event_add(&task_info->on_read, &task_info->timeout)) { close(task_info->on_read.ev_fd); task_pool_push(task_info); log_error("File: "__FILE__", line: %d, "\ "event_add fail.", __LINE__); return; } } else { log_error("File: "__FILE__", line: %d,recv failed,errno: %d, error info: %s", __LINE__, errno, strerror(errno)); close(task_info->on_read.ev_fd); task_pool_push(task_info); } return; } else if(0 == bytes) { log_warning("File:"__FILE__",Line:%d.recv buffer form network is error.disconnection the network.", __LINE__); close(task_info->on_read.ev_fd); task_pool_push(task_info); return; } if(task_info->request_info.length-task_info->offset > bytes) { log_warning("File:"__FILE__",Line:%d.recv body is not over.",__LINE__); task_info->offset += bytes; if(0 != event_add(&task_info->on_read, &task_info->timeout)) { close(task_info->on_read.ev_fd); task_pool_push(task_info); log_error("File: "__FILE__", line: %d, "\ "event_add fail.", __LINE__); return; } } else { task_info->read_type = unspecified; break; } } deal_request_body(client_socket_fd,task_info); return; }
void deal_working_thread(void *arg) { log_info("debug to this."); int client_socket_fd = (int) arg; if(0 > client_socket_fd) { log_error("File:"__FILE__",Line:%d.the arg means client socket filedesc is less 0!",__LINE__); return; } /* 设置网络为非阻塞,libevent必须的 */ if(!set_nonblocking(client_socket_fd)) { log_error("File:"__FILE__",Line:%d.set client socket filedesc is error.error info is %s!", __LINE__,strerror(errno)); close(client_socket_fd); return; } task_info_t *task_info; task_info = task_pool_pop(); /* 对event_base注册事件回调函数, 注意没有使用EV_PERSIST */ do { task_info->read_type = header; event_set(&task_info->on_read,client_socket_fd,EV_READ,read_buffer,(void *) task_info); if(0 != event_base_set(task_info->event_base,&task_info->on_read)) { log_error("File:"__FILE__",Line:%d.Associate the read header event to event_base is error.",__LINE__); task_info->read_type = unspecified; close(client_socket_fd); task_pool_push(task_info); break; } event_set(&task_info->on_write,client_socket_fd,EV_WRITE,response_handle,(void *) task_info); if(0 != event_base_set(task_info->event_base,&task_info->on_write)) { log_error("File:"__FILE__",Line:%d.Associate the write hander to event_base is error.",__LINE__); task_info->read_type = unspecified; close(client_socket_fd); task_pool_push(task_info); break; } if(0 != event_add(&task_info->on_read,&task_info->timeout)) { log_error("File:"__FILE__",Line:%d.add the read header event to event_base is error.",__LINE__); task_info->read_type = unspecified; close(client_socket_fd); task_pool_push(task_info); break; } event_base_loop(task_info->event_base,EVLOOP_NONBLOCK); }while(false); return; }