转载请注明出自 http://www.felix021.com/blog/read.php?2068 ,如是转载文则注明原出处,谢谢:)php
花了两天的时间在libevent上,想总结下,就以写简单tutorial的方式吧,貌似没有一篇简单的说明,让人立刻就能上手用的。html
首先给出官方文档吧: http://libevent.org ,首页有个Programming with Libevent,里面是一节一节的介绍libevent,可是感受信息量太大了,并且仍是英文的-。-(固然,若是想好好用libevent,看看仍是颇有必要的),还有个Reference,大体就是对各个版本的 libevent 使用 doxgen 生成的文档,用来查函数原型和基本用法什么的。web
下面假定已经学习过基本的 socket
编程(socket,bind,listen,accept,connect,recv,send,close),而且对异步/callback
有基本认识。编程
基本的 socket 编程是阻塞/同步的,每一个操做除非已经完成或者出错才会返回,这样对于每个请求,要使用一个线程或者单独的进程去处理,系统资源无法支撑大量的请求(所谓 c10k problem?),例如内存:默认状况下每一个线程须要占用2~8M的栈空间。posix定义了可使用异步的 select 系统调用,可是由于其采用了轮询的方式来判断某个fd是否变成 active,效率不高[O(n)
],链接数一多,也仍是撑不住。因而各系统分别提出了基于异步/callback的系统调用,例如Linux的 epoll
,BSD
的 kqueue
,Windows
的 IOCP
。因为在内核层面作了支持,因此能够用 `O(1)的效率查找到active的fd。基本上,libevent就是对这些高效 IO 的封装,提供统一的API,简化开发。windows
默认状况下是单线程的(能够配置成多线程,若是有须要的话),每一个线程有且只有一个 event_base
,对应一个 struct event_base
结构体(以及附于其上的事件管理器),用来 schedule 托管给它的一系列 event,能够和操做系统的进程管理类比,固然,要更简单一点。当一个事件发生后, event_base
会在合适的时间(不必定是当即)去调用绑定在这个事件上的函数(传入一些预约义的参数,以及在绑定时指定的一个参数),直到这个函数执行完,再返回schedule其余事件。缓存
//建立一个event_base
struct event_base *base = event_base_new();
assert(base != NULL );
event_base
内部有一个循环,循环阻塞在 epoll
/kqueue
等系统调用上,直到有一个一些事件发生,而后去处理这些事件。固然,这些事件要被绑定在这个 event_base上
。每一个事件对应一个struct event,能够是监听一个fd或者 POSIX
信号量之类(这里只讲fd了,其余的看manual吧)。struct event
使用 event_new
来建立和绑定,使用 event_add
来启用:服务器
//建立并绑定一个event
struct event *listen_event;网络
//参数:event_base, 监听的fd,事件类型及属性,绑定的回调函数,给回调函数的参数
listen_event = event_new(base, listener, EV_READ|EV_PERSIST, callback_func, ( void *)base);多线程
//参数:event,超时时间( struct timeval *类型的, NULL 表示无超时设置)
event_add(listen_event, NULL );并发
注:libevent支持的事件及属性包括(使用bitfield实现,因此要用 | 来让它们合体)
EV_TIMEOUT
: 超时
EV_READ
: 只要网络缓冲中还有数据,回调函数就会被触发
EV_WRITE
: 只要塞给网络缓冲的数据被写完,回调函数就会被触发
EV_SIGNAL
: POSIX
信号量,参考manual吧
EV_PERSIST
: 不指定这个属性的话,回调函数被触发后事件会被删除
EV_ET
: Edge-Trigger边缘触发,参考EPOLL_ET
而后须要启动event_base的循环,这样才能开始处理发生的事件。循环的启动使用 event_base_dispatch
,循环将一直持续,直到再也不有须要关注的事件,或者是遇到 event_loopbreak()
/ event_loopexit()
函数。
//启动事件循环
event_base_dispatch(base);
接下来关注下绑定到 event
的回调函数 callback_func
:传递给它的是一个 socket fd
、一个 event类型及属性 bit_field
、以及传递给 event_new
的最后一个参数(去上面几行回顾一下,把 event_base
给传进来了,实际上更多地是分配一个结构体,把相关的数据都撂进去,而后丢给event_new,在这里就能取获得了)。其原型是:
typedef void (* event_callback_fn)(evutil_socket_t sockfd, short event_type, void *arg)
对于一个服务器而言,上面的流程大概是这样组合的:
listener = socket(),bind(),listen(),设置nonblocking(POSIX系统中可以使用fcntl设置,windows不须要设置,实际上libevent提供了统一的包装evutil_make_socket_nonblocking)
建立一个event_base
建立一个event,将该socket托管给event_base,指定要监听的事件类型,并绑定上相应的回调函数(及须要给它的参数)。对于listener socket来讲,只须要监听EV_READ|EV_PERSIST
启用该事件
进入事件循环
(异步) 当有client发起请求的时候,调用该回调函数,进行处理。
QUESTION : 为何不在listen完立刻调用accept,得到客户端链接之后再丢给event_base呢?这个问题先想一想噢。
ANSWER : 回调函数要作什么事情呢?固然是处理client的请求了。首先要accept,得到一个能够与client通讯的sockfd,而后……调用recv/send吗?错!大错特错!若是直接调用recv/send的话,这个线程就阻塞在这个地方了,若是这个客户端很是的阴险(好比一直不发消息,或者网络很差,总是丢包),libevent就只能等它,无法处理其余的请求了——因此应该建立一个新的event来托管这个sockfd。
magichan 注释: 这段话应该是说
accpet
得到一个 socket 时,不会放在这个回调函数自己
处理这个套接字发过来的数据,而是把该套接字绑定到一个的新event
中,再放回事件循环之中。
若是该 socket 有数据发送过来,那么就触发新的event
处理数据。
在老版本libevent上的实现,比较罗嗦[若是不想详细了解的话,看下一部分]。
对于服务器但愿先从client获取数据的状况,大体流程是这样的:
将这个sockfd设置为 nonblocking
建立2个event:
event_read
,绑上sockfd的 EV_READ|EV_PERSIST
,设置回调函数和参数(后面提到的struct)
event_write
,绑上sockfd的 EV_WRITE|EV_PERSIST
,设置回调函数和参数(后面提到的struct)
启用 event_read
事件
(异步) 等待 event_read
事件的发生, 调用相应的回调函数。这里麻烦来了:回调函数用recv读入的数据,不能直接用 send 丢给 sockfd 了事——由于 sockfd 是 nonblocking 的,丢给它的话,不能保证正确(为何呢?)。因此须要一个本身管理的缓存用来保存读入的数据中(在accept之后就建立一个struct,做为第2步回调函数的arg传进来),在合适的时间(好比遇到换行符)启用 event_write
事件【event_add(event_write, NULL)】,等待 EV_WRITE
事件的触发
(异步) 当 event_write 事件的回调函数被调用的时候,往 sockfd 写入数据,而后删除 event_write 事件【event_del(event_write)】,等待event_read事件的下一次执行。
以上步骤比较晦涩,具体代码可参考官方文档里面的【Example: A low-level ROT13 server with Libevent】
因为须要本身管理缓冲区,且过程晦涩难懂,而且不兼容于Windows的IOCP,因此libevent2开始,提供了bufferevent这个神器,用来提供更加优雅、易用的API。struct bufferevent
内建了两个 event(read/write)
和对应的缓冲区【struct evbuffer *input
, *output
】,并提供相应的函数用来操做缓冲区(或者直接操做bufferevent
)。每当有数据被读入input的时候,read_cb
函数被调用;每当utput
被输出完的时候,write_cb
被调用;在网络IO操做出现错误的状况(链接中断、超时、其余错误),error_cb
被调用。因而上一部分的步骤被简化为:
设置 sockfd 为 nonblocking
使用 bufferevent_socket_new
建立一个 struct bufferevent *bev,关联该sockfd,托管给event_base
使用 bufferevent_setcb(bev, read_cb, write_cb, error_cb, (void *)arg)
将EV_READ/EV_WRITE 对应的函数
使用 bufferevent_enable(bev, EV_READ|EV_WRITE|EV_PERSIST)
来启用 read/write 事件
(异步)
在 read_cb
里面从 input 读取数据,处理完毕后塞到 output 里(会被自动写入到sockfd)
在 write_cb
里面(须要作什么吗?对于一个echo server来讲,read_cb 就足够了)
在 error_cb 里面处理遇到的错误 能够 使用 bufferevent_set_timeouts(bev, struct timeval *READ, struct timeval *WRITE)
来设置读写超时, 在 error_cb 里面处理超时。
read_cb
和 write_cb
的原型是
void read_or_write_callback(struct bufferevent *bev, void *arg)
error_cb
的原型是
void error_cb(struct bufferevent *bev, short error, void *arg) //这个是event的标准回调函数原型
能够从bev中用libevent的API提取出event_base、sockfd、input/output等相关数据,详情RTFM~
因而代码简化到只须要几行的read_cb和error_cb函数便可:
void read_cb( struct bufferevent *bev, void *arg) {
char line[256];
int n;
evutil_socket_t fd = bufferevent_getfd(bev);
while (n = bufferevent_read(bev, line, 256), n > 0)
bufferevent_write(bev, line, n);
}
void error_cb( struct bufferevent *bev, short event, void *arg) {
bufferevent_free(bev);
}
因而一个支持大并发量的echo server就成型了!下面附上无注释的echo server源码,110行,多抄几遍,就能彻底弄懂啦!更复杂的例子参见官方文档里面的【Example: A simpler ROT13 server with Libevent】
# include <stdio.h>
# include <stdlib.h>
# include <errno.h>
# include <assert.h>
# include <event2/event.h>
# include <event2/bufferevent.h>
# define LISTEN_PORT 9999
# define LISTEN_BACKLOG 32
void do_accept(evutil_socket_t listener, short event, void *arg);
void read_cb( struct bufferevent *bev, void *arg);
void error_cb( struct bufferevent *bev, short event, void *arg);
void write_cb( struct bufferevent *bev, void *arg);
int main( int argc, char *argv[]) {
int ret;
evutil_socket_t listener;
listener = socket(AF_INET, SOCK_STREAM, 0);
assert(listener > 0);
evutil_make_listen_socket_reuseable(listener);
struct sockaddr_in sin;
sin.sin_family = AF_INET;
sin.sin_addr.s_addr = 0;
sin.sin_port = htons(LISTEN_PORT);
if (bind(listener, ( struct sockaddr *)&sin, sizeof (sin)) < 0) {
perror("bind");
return 1;
}
if (listen(listener, LISTEN_BACKLOG) < 0) {
perror("listen");
return 1;
}
printf ("Listening...\n");
evutil_make_socket_nonblocking(listener);
struct event_base *base = event_base_new();
assert(base != NULL );
struct event *listen_event;
listen_event = event_new(base, listener, EV_READ|EV_PERSIST, do_accept, ( void *)base);
event_add(listen_event, NULL );
event_base_dispatch(base);
printf ("The End.");
return 0;
}
void do_accept(evutil_socket_t listener, short event, void *arg) {
struct event_base *base = ( struct event_base *)arg;
evutil_socket_t fd;
struct sockaddr_in sin;
socklen_t slen = sizeof (sin);
fd = accept(listener, ( struct sockaddr *)&sin, &slen);
if (fd < 0) {
perror("accept");
return ;
}
if (fd > FD_SETSIZE) { //这个 if 是参考了那个ROT13的例子,貌似是官方的疏漏,从select-based例子里抄过来忘了改
perror("fd > FD_SETSIZE\n");
return ;
}
printf ("ACCEPT: fd = %u\n", fd);
struct bufferevent *bev = bufferevent_socket_new(base, fd, BEV_OPT_CLOSE_ON_FREE);
bufferevent_setcb(bev, read_cb, NULL , error_cb, arg);
bufferevent_enable(bev, EV_READ|EV_WRITE|EV_PERSIST);
}
void read_cb( struct bufferevent *bev, void *arg) {
# define MAX_LINE 256
char line[MAX_LINE+1];
int n;
evutil_socket_t fd = bufferevent_getfd(bev);
while (n = bufferevent_read(bev, line, MAX_LINE), n > 0) {
line[n] = '\0';
printf ("fd=%u, read line: %s\n", fd, line);
bufferevent_write(bev, line, n);
}
}
void write_cb( struct bufferevent *bev, void *arg) {}
void error_cb( struct bufferevent *bev, short event, void *arg) {
evutil_socket_t fd = bufferevent_getfd(bev);
printf ("fd = %u, ", fd);
if (event & BEV_EVENT_TIMEOUT) {
printf ("Timed out\n"); // if bufferevent_set_timeouts() called
}
else if (event & BEV_EVENT_EOF) {
printf ("connection closed\n");
}
else if (event & BEV_EVENT_ERROR) {
printf ("some other error\n");
}
bufferevent_free(bev);
}
--