Libevent 2 提供了 bufferevent 接口,简化了编程的难度,bufferevent 其实是对底层事件核心的封装,所以学习 bufferevent 的实现是研究 Libevent 底层 event、event_base 用法的一个好办法。本文假定你已经对 Libevent 有必定的认识,不然能够先阅读我关于 Libevent 的介绍:
Libevent(1)— 简介、编译、配置
Libevent(2)— event、event_base
Libevent(3)— 基础库
Libevent(4)— Bufferevent
Libevent(5)— 链接监听器html
参考文献列表:
http://www.wangafu.net/~nickm/libevent-book/编程
此文编写的时候,使用到的 Libevent 为 2.0.21。本文略过了关于 event 优先权和超时相关的讨论。ubuntu
建立和销毁 event_base
event_base 是首先须要被建立出来的对象。event_base 结构持有了一个 event 集合。若是 event_base 被设置了使用锁,那么它在多个线程中能够安全的访问。可是对 event_base 的循环(下面会立刻解释什么是“对 event_base 的循环”)只能在某个线程中执行。若是你但愿多个线程进行循环,那么应该作的就是为每个线程建立一个 event_base。后端
event_base 存在多个后端能够选择(咱们也把 event_base 后端叫作 event_base 的方法):安全
- select
- poll
- epoll
- kqueue
- devpoll
- evport
- win32
在咱们建立 event_base 的时候,Libevent 会为咱们选择最快的后端。建立 event_base 经过函数 event_base_new() 来完成:服务器
- // 建立成功返回一个拥有默认设置的 event base
- // 建立失败返回 NULL
- struct event_base *event_base_new(void);
-
- // 查看 event_base 实际上使用到的后端
- const char *event_base_get_method(const struct event_base *base);
对于大多数程序来讲,默认设置已经够用了。多线程
event_base 的释放使用函数:异步
- void event_base_free(struct event_base *base);
事件循环(event loop)
event_base 会持有一组 event(这是咱们前面说到的),换而言之就是说,咱们能够向 event_base 中注册 event(具体如何注册本文先不谈)。若是咱们向 event_base 中注册了一些 event,那么就可让 Libevent 开始事件循环了:socket
- // 指定一个 event_base 并开始事件循环
- // 此函数内部被实现为一个不断进行的循环
- // 此函数返回 0 表示成功退出
- // 此函数返回 -1 表示存在未处理的错误
- int event_base_dispatch(struct event_base *base);
默认的事件循环会在如下的状况中止(也就是 event_base_dispatch 会返回):函数
- 若是 base 中没有 event,那么事件循环将中止
- 调用 event_base_loopbreak(),那么事件循环将中止
- 调用 event_base_loopexit(),那么事件循环将中止
- 若是出现错误,那么事件循环将中止
事件循环会检测是否存在活跃事件(以前已经介绍过活跃事件这一术语:http://name5566.com/4190.html),若存在活跃事件,那么调用事件对应的回调函数。
中止事件循环的能够经过移除 event_base 中的 event 来实现。若是你但愿在 event_base 中存在 event 的状况下中止事件循环,能够经过如下函数完成:
- // 这两个函数成功返回 0 失败返回 -1
- // 指定在 tv 时间后中止事件循环
- // 若是 tv == NULL 那么将无延时的中止事件循环
- int event_base_loopexit(struct event_base *base,
- const struct timeval *tv);
- // 当即中止事件循环(而不是无延时的中止)
- int event_base_loopbreak(struct event_base *base);
这里须要区别一下 event_base_loopexit(base, NULL) 和 event_base_loopbreak(base):
- event_base_loopexit(base, NULL) 若是当前正在为多个活跃事件调用回调函数,那么不会当即退出,而是等到全部的活跃事件的回调函数都执行完成后才退出事件循环
- event_base_loopbreak(base) 若是当前正在为多个活跃事件调用回调函数,那么当前正在调用的回调函数会被执行,而后立刻退出事件循环,而并不处理其余的活跃事件了
有时候,咱们须要在事件的回调函数中获取当前的时间,这时候你不须要调用 gettimeofday() 而是使用 event_base_gettimeofday_cached()(你的系统可能实现 gettimeofday() 为一个系统调用,你能够避免系统调用的开销):
- // 获取到的时间为开始执行此轮事件回调函数的时间
- // 成功返回 0 失败返回负数
- int event_base_gettimeofday_cached(struct event_base *base,
- struct timeval *tv_out);
若是咱们须要(为了调试)获取被注册到 event_base 的全部的 event 和它们的状态,调用 event_base_dump_events() 函数:
- // f 表示输出内容的目标文件
- void event_base_dump_events(struct event_base *base, FILE *f);
event
如今开始详细的讨论一下 event。在 Libevent 中 event 表示了一组条件,例如:
- 一个文件描述符可读或者可写
- 超时
- 出现一个信号
- 用户触发了一个事件
event 的相关术语:
- 一个 event 经过 event_new() 建立出来,那么它是已初始化的,另外 event_assign() 也被用来初始化 event(可是有它特定的用法)
- 一个 event 被注册到(经过 add 函数添加到)一个 event_base 中,其为 pending 状态
- 若是 pending event 表示的条件被触发了,那么此 event 会变为活跃的,其为 active 状态,则 event 相关的回调函数被调用
- event 能够是 persistent(持久的)也能够是非 persistent 的。event 相关的回调函数被调用以后,只有 persistent event 会继续转为 pending 状态。对于非 persistent 的 event 你能够在 event 相关的事件回调函数中调用 event_add() 使得此 event 转为 pending 状态
event 经常使用 API:
-
// 定义了各类条件
// 超时
#define EV_TIMEOUT 0x01
// event 相关的文件描述符能够读了
#define EV_READ 0x02
// event 相关的文件描述符能够写了
#define EV_WRITE 0x04
// 被用于信号检测(详见下文)
#define EV_SIGNAL 0x08
// 用于指定 event 为 persistent
#define EV_PERSIST 0x10
// 用于指定 event 会被边缘触发(Edge-triggered 可参考 http://name5566.com/3818.html)
#define EV_ET 0x20
// event 的回调函数
// 参数 evutil_socket_t --- event 关联的文件描述符
// 参数 short --- 当前发生的条件(也就是上面定义的条件)
// 参数 void* --- 其为 event_new 函数中的 arg 参数
typedef void (*event_callback_fn)(evutil_socket_t, short, void *);
// 建立 event
// base --- 使用此 event 的 event_base
// what --- 指定 event 关心的各类条件(也就是上面定义的条件)
// fd --- 文件描述符
// cb --- event 相关的回调函数
// arg --- 用户自定义数据
// 函数执行失败返回 NULL
struct event *event_new(struct event_base *base, evutil_socket_t fd,
short what, event_callback_fn cb,
void *arg);
// 释放 event(真正释放内存,对应 event_new 使用)
// 能够用来释放由 event_new 分配的 event
// 若 event 处于 pending 或者 active 状态释放也不会存在问题
void event_free(struct event *event);
// 清理 event(并非真正释放内存)
// 可用于已经初始化的、pending、active 的 event
// 此函数会将 event 转换为非 pending、非 active 状态的
// 函数返回 0 表示成功 -1 表示失败
int event_del(struct event *event);
// 用于向 event_base 中注册 event
// tv 用于指定超时时间,为 NULL 表示无超时时间
// 函数返回 0 表示成功 -1 表示失败
int event_add(struct event *ev, const struct timeval *tv);
一个范例:
-
#include <event2/event.h>
// event 的回调函数
void cb_func(evutil_socket_t fd, short what, void *arg)
{
const char *data = arg;
printf("Got an event on socket %d:%s%s%s%s [%s]",
(int) fd,
(what&EV_TIMEOUT) ? " timeout" : "",
(what&EV_READ) ? " read" : "",
(what&EV_WRITE) ? " write" : "",
(what&EV_SIGNAL) ? " signal" : "",
data);
}
void main_loop(evutil_socket_t fd1, evutil_socket_t fd2)
{
struct event *ev1, *ev2;
struct timeval five_seconds = {5, 0};
// 建立 event_base
struct event_base *base = event_base_new();
// 这里假定 fd一、fd2 已经被设置好了(它们都被设置为非阻塞的)
// 建立 event
ev1 = event_new(base, fd1, EV_TIMEOUT | EV_READ | EV_PERSIST, cb_func,
(char*)"Reading event");
ev2 = event_new(base, fd2, EV_WRITE | EV_PERSIST, cb_func,
(char*)"Writing event");
// 注册 event 到 event_base
event_add(ev1, &five_seconds);
event_add(ev2, NULL);
// 开始事件循环
event_base_dispatch(base);
}
Libevent 可以处理信号。信号 event 相关的函数:
- // base --- event_base
- // signum --- 信号,例如 SIGHUP
- // callback --- 信号出现时调用的回调函数
- // arg --- 用户自定义数据
- #define evsignal_new(base, signum, callback, arg) \
- event_new(base, signum, EV_SIGNAL|EV_PERSIST, cb, arg)
-
- // 将信号 event 注册到 event_base
- #define evsignal_add(ev, tv) \
- event_add((ev),(tv))
-
- // 清理信号 event
- #define evsignal_del(ev) \
- event_del(ev)
在一般的 POSIX 信号处理函数中,很多函数是不能被调用的(例如,不可重入的函数),可是在 Libevent 中却没有这些限制,由于信号 event 设定的回调函数运行在事件循环中。另外须要注意的是,在同一个进程中 Libevent 只能容许一个 event_base 监听信号。
性能相关问题
Libevent 提供了咱们机制来重用 event 用以免 event 在堆上的频繁分配和释放。相关的接口:
- // 此函数用于初始化 event(包括能够初始化栈上和静态存储区中的 event)
- // event_assign() 和 event_new() 除了 event 参数以外,使用了同样的参数
- // event 参数用于指定一个未初始化的且须要初始化的 event
- // 函数成功返回 0 失败返回 -1
- int event_assign(struct event *event, struct event_base *base,
- evutil_socket_t fd, short what,
- void (*callback)(evutil_socket_t, short, void *), void *arg);
-
- // 相似上面的函数,此函数被信号 event 使用
- #define evsignal_assign(event, base, signum, callback, arg) \
- event_assign(event, base, signum, EV_SIGNAL|EV_PERSIST, callback, arg)
已经初始化或者处于 pending 的 event,首先须要调用 event_del() 后再调用 event_assign()。
这里咱们进一步认识一下 event_new()、event_assign()、event_free()、event_del()
event_new() 实际上完成了两件事情:
- 经过内存分配函数在堆上分配了 event
- 使用 event_assign() 初始化了此 event
event_free() 实际上完成了两件事情:
- 调用 event_del() 进行 event 的清理工做
- 经过内存分配函数在堆上释放此 event
创建链接:
// address 和 addrlen 和标准的 connect 函数的参数没有区别
// 若是 bufferevent bev 没有设置 socket(在建立时能够设置 socket)
// 那么调用此函数将分配一个新的 socket 给 bev
// 链接成功返回 0 失败返回 -1
int bufferevent_socket_connect(struct bufferevent *bev,
struct sockaddr *address, int addrlen);
简单的一个范例:
#include <event2/event.h>
#include <event2/bufferevent.h>
#include <sys/socket.h>
#include <string.h>
// 事件回调函数
void eventcb(struct bufferevent *bev, short events, void *ptr)
{
// 链接成功创建
if (events & BEV_EVENT_CONNECTED) {
/* We're connected to 127.0.0.1:8080. Ordinarily we'd do
something here, like start reading or writing. */
// 出现错误
} else if (events & BEV_EVENT_ERROR) {
/* An error occured while connecting. */
}
}
int main_loop(void)
{
struct event_base *base;
struct bufferevent *bev;
struct sockaddr_in sin;
base = event_base_new();
// 初始化链接地址
memset(&sin, 0, sizeof(sin));
sin.sin_family = AF_INET;
sin.sin_addr.s_addr = htonl(0x7f000001); /* 127.0.0.1 */
sin.sin_port = htons(8080); /* Port 8080 */
// 建立一个基于 socket 的 bufferevent
// 参数 -1 表示并不为此 bufferevent 设置 socket
bev = bufferevent_socket_new(base, -1, BEV_OPT_CLOSE_ON_FREE);
// 为 bufferevent bev 设置回调函数
// 这里仅仅设置了事件回调函数
// 后面会详细谈及此函数
bufferevent_setcb(bev, NULL, NULL, eventcb, NULL); //setcb cb表示callback,回调
// 进行链接
if (bufferevent_socket_connect(bev,
(struct sockaddr *)&sin, sizeof(sin)) < 0) {
/* Error starting connection */
bufferevent_free(bev);
return -1;
}
// 开始事件循环
event_base_dispatch(base);
return 0;
}
更多的 bufferevent API
释放 bufferevent
- // 若是存在未完成的延时回调,bufferevent 会在回调完成后才被真正释放
- void bufferevent_free(struct bufferevent *bev);
bufferevent 回调函数的设置和获取
/ 读取、写入回调函数原型
// ctx 为用户自定义数据(由 bufferevent_setcb 设定)
typedef void (*bufferevent_data_cb)(struct bufferevent *bev, void *ctx);
// 事件回调函数原型
// ctx 为用户自定义数据(由 bufferevent_setcb 设定)
// events 选项能够为:
// BEV_EVENT_READING --- 在 bufferevent 上进行读取操做时出现了一个事件
// BEV_EVENT_WRITING --- 在 bufferevent 上进行写入操做时出现了一个事件
// BEV_EVENT_ERROR --- 进行 bufferevent 操做时(例如调用 bufferevent API)出错,获取详细的错误信息使用 EVUTIL_SOCKET_ERROR()
// BEV_EVENT_TIMEOUT --- 在 bufferevent 上出现了超时
// BEV_EVENT_EOF --- 在 bufferevent 上遇到了文件结束符
// BEV_EVENT_CONNECTED --- 在 bufferevent 上请求链接完成了
typedef void (*bufferevent_event_cb)(struct bufferevent *bev, short events, void *ctx);
// 设置回调函数
// 若是但愿禁用回调函数,那么设置对应的参数为 NULL
void bufferevent_setcb(
// bufferevent
struct bufferevent *bufev,
// 读取回调函数
bufferevent_data_cb readcb,
// 写入回调函数
bufferevent_data_cb writecb,
// 事件回调函数
bufferevent_event_cb eventcb,
// 用户定义的数据
// 这三个回调函数均共享此参数
void *cbarg
);
// 取回回调函数
// 参数为 NULL 表示忽略
void bufferevent_getcb(
struct bufferevent *bufev,
bufferevent_data_cb *readcb_ptr,
bufferevent_data_cb *writecb_ptr,
bufferevent_event_cb *eventcb_ptr,
void **cbarg_ptr
);
设置 watermark
- // events 参数能够为
- // EV_READ 表示设置 read watermark
- // EV_WRITE 表示设置 write watermark
- // EV_READ | EV_WRITE 表示设置 read 以及 write watermark
- void bufferevent_setwatermark(struct bufferevent *bufev, short events,
- size_t lowmark, size_t highmark);
获取到输入和输出 buffer
- // 获取到输入 buffer
- struct evbuffer *bufferevent_get_input(struct bufferevent *bufev);
- // 获取到输出 buffer
- struct evbuffer *bufferevent_get_output(struct bufferevent *bufev);
添加数据到输出 buffer
- // 下面两个函数执行成功返回 0 失败返回 -1
-
- // 向 bufev 的输出 buffer 中添加大小为 size 的数据
- // 数据的首地址为 data
- int bufferevent_write(struct bufferevent *bufev,
- const void *data, size_t size);
- // 向 bufev 的输出 buffer 中添加数据
- // 数据来源于 buf
- // 此函数会清除 buf 中的全部数据
- int bufferevent_write_buffer(struct bufferevent *bufev,
- struct evbuffer *buf);
从输入 buffer 中获取数据
- // 从 bufev 的输入 buffer 中获取最多 size 字节的数据保存在 data 指向的内存中
- // 此函数返回实际读取的字节数
- size_t bufferevent_read(struct bufferevent *bufev, void *data, size_t size);
- // 获取 bufev 的输入 buffer 中的全部数据并保存在 buf 中
- // 此函数成功返回 0 失败返回 -1
- int bufferevent_read_buffer(struct bufferevent *bufev,
- struct evbuffer *buf);
关于 bufferevent 的一些高级话题,能够参考:http://www.wangafu.net/~nickm/libevent-book/Ref6a_advanced_bufferevents.html
evbuffers
evbuffer 是一个队列,在其尾部添加数据和在其头部删除数据均被优化了。evbuffer 相关的 API 在这里能够查看:http://www.wangafu.net/~nickm/libevent-book/Ref7_evbuffer.html
参考文献列表:
http://www.wangafu.net/~nickm/libevent-book/
此文编写的时候,使用到的 Libevent 为 2.0.21
Libevent 提供了链接监听器 evconnlistener
建立 evconnlistener 实例
// 链接监听器回调函数原型
typedef void (*evconnlistener_cb)(
struct evconnlistener *listener,
// 新的 socket
evutil_socket_t sock,
// 新的 socket 对应的地址
struct sockaddr *addr,
int len,
// 用户自定义数据
void *ptr
);
// 建立一个新的链接监听器
struct evconnlistener *evconnlistener_new(
struct event_base *base,
// 一个新的链接到来时此回调被调用
evconnlistener_cb cb,
// 用户自定义数据,会被传递给 cb 回调函数
void *ptr,
// 链接监听器的选项(下面会详细谈到)
unsigned flags,
// 为标准的 listen 函数的 backlog 参数
// 若是为负数,Libevent 将尝试选择一个合适的值
int backlog,
// socket
// Libevent 假定此 socket 已经绑定
evutil_socket_t fd
);
// 建立一个新的链接监听器
// 大多数参数含义同于 evconnlistener_new
struct evconnlistener *evconnlistener_new_bind(
struct event_base *base,
evconnlistener_cb cb,
void *ptr,
unsigned flags,
int backlog,
// 指定须要绑定的 socket 地址
const struct sockaddr *sa,
int socklen
);
链接监听器的经常使用选项以下:
- LEV_OPT_CLOSE_ON_FREE
当关闭链接监听器其底层 socket 也被自动释放
- LEV_OPT_REUSEABLE
设置 socket 绑定的地址能够重用
- LEV_OPT_THREADSAFE
设置链接监听器为线程安全的
释放链接监听器
- void evconnlistener_free(struct evconnlistener *lev);
错误检测
若是链接监听器出错,咱们能够获得通知:
- // 链接监听器错误回调函数原型
- typedef void (*evconnlistener_errorcb)(struct evconnlistener *lis, void *ptr);
-
- // 为链接监听器设置错误回调函数
- void evconnlistener_set_error_cb(struct evconnlistener *lev,
- evconnlistener_errorcb errorcb);
一个详细的范例(echo 服务器)
#include <event2/listener.h>
#include <event2/bufferevent.h>
#include <event2/buffer.h>
#include <arpa/inet.h>
#include <string.h>
#include <stdlib.h>
#include <stdio.h>
#include <errno.h>
// 读取回调函数
static void
echo_read_cb(struct bufferevent *bev, void *ctx)
{
struct evbuffer *input = bufferevent_get_input(bev);
struct evbuffer *output = bufferevent_get_output(bev);
// 将输入缓冲区的数据直接拷贝到输出缓冲区
evbuffer_add_buffer(output, input);
}
// 事件回调函数
static void
echo_event_cb(struct bufferevent *bev, short events, void *ctx)
{
if (events & BEV_EVENT_ERROR)
perror("Error from bufferevent");
if (events & (BEV_EVENT_EOF | BEV_EVENT_ERROR)) {
bufferevent_free(bev);
}
}
// 链接监听器回调函数
static void
accept_conn_cb(struct evconnlistener *listener,
evutil_socket_t fd, struct sockaddr *address, int socklen,
void *ctx)
{
// 为新的链接分配并设置 bufferevent
struct event_base *base = evconnlistener_get_base(listener);
struct bufferevent *bev = bufferevent_socket_new(
base, fd, BEV_OPT_CLOSE_ON_FREE);
bufferevent_setcb(bev, echo_read_cb, NULL, echo_event_cb, NULL);
bufferevent_enable(bev, EV_READ|EV_WRITE);
}
// 链接监听器错误回调函数
static void
accept_error_cb(struct evconnlistener *listener, void *ctx)
{
struct event_base *base = evconnlistener_get_base(listener);
// 获取到错误信息
int err = EVUTIL_SOCKET_ERROR();
fprintf(stderr, "Got an error %d (%s) on the listener. "
"Shutting down.\n", err, evutil_socket_error_to_string(err));
// 退出事件循环
event_base_loopexit(base, NULL);
}
int
main(int argc, char **argv)
{
struct event_base *base;
struct evconnlistener *listener;
struct sockaddr_in sin;
int port = 9876;
if (argc > 1) {
port = atoi(argv[1]);
}
if (port<=0 || port>65535) {
puts("Invalid port");
return 1;
}
base = event_base_new();
if (!base) {
puts("Couldn't open event base");
return 1;
}
memset(&sin, 0, sizeof(sin));
sin.sin_family = AF_INET;
sin.sin_addr.s_addr = htonl(0);
sin.sin_port = htons(port);
listener = evconnlistener_new_bind(base, accept_conn_cb, NULL,
LEV_OPT_CLOSE_ON_FREE | LEV_OPT_REUSEABLE, -1,
(struct sockaddr*) & sin, sizeof(sin));
if (!listener) {
perror("Couldn't create listener");
return 1;
}
evconnlistener_set_error_cb(listener, accept_error_cb);
event_base_dispatch(base);
return 0;
}
ubuntu下头文件:
#include<string.h>
#include<stdlib.h>
#include<stdio.h>
#include<errno.h>
#include<netinet/in.h>
#include<event2/listener.h>
#include<event2/bufferevent.h>
#include<event2/buffer.h>
关于echo 的读的客户端看:
http://blog.sina.com.cn/s/blog_87d946d40100vrv9.html
Libevent(3)— 基础库
参考文献列表:
http://www.wangafu.net/~nickm/libevent-book/
此文编写的时候,使用到的 Libevent 为 2.0.21
经常使用基本数据类型
时间相关
Socket API
字符串相关
安全的随机数生成
Libevent(4)— Bufferevent
此文编写的时候,使用到的 Libevent 为 2.0.21
Buffer IO 模式
bufferevent 提供给咱们一种 Buffer IO 模式(这里以写入数据为例):
每个 bufferevent 都包含了一个输入 buffer 和一个输出 buffer,它们的类型为 evbuffer(结构体)。当咱们向 bufferevent 写入数据的时候,实际上数据首先被写入到了输出 buffer,当 bufferevent 有数据可读时,咱们其实是从输入 buffer 中获取数据。
目前 bufferevent 目前仅仅支持 stream-oriented 的协议(例如 TCP)并不支持 datagram-oriented 协议(例如 UDP)。一个 bufferevent 的实例负责一个特定的链接上的数据收发。
Libevent 能够按须要建立多种类型的 bufferevent:
bufferevent 的回调函数
每一个 bufferevent 实例能够有 3 个回调函数(经过接口 bufferevent_setcb 设置):
对于 buffer 的读、写和回调行为能够经过几个参数来配置,这几个参数在 Libevent 中被叫作 watermark(水位标记,咱们能够将 buffer 想象为一个水池,水位标记用于标记水池中水的多少,也就是说,watermark 用于标记 buffer 中的数据量)。watermark 被实现为整数(类型为 size_t),有几种类型的 watermark:
在一些特殊需求中(详细并不讨论),咱们可能须要回调函数被延时执行(这种被延时的回调函数被叫作 Deferred callbacks)。延时回调函数会在事件循环中排队,并在普通事件回调函数(regular event’s callback)以后被调用。
从基于 socket 的 bufferevent 开始认识 bufferevent
建立基于 socket 的 bufferevent:
buffervent 的选项能够用来改变 bufferevent 的行为。可用的选项包括:
当 bufferevent 被释放同时关闭底层(socket 被关闭等)
为 bufferevent 自动分配锁,这样可以在多线程环境中安全使用
当设置了此标志,bufferevent 会延迟它的全部回调(参考前面说的延时回调)
若是 bufferevent 被设置为线程安全的,用户提供的回调被调用时 bufferevent 的锁会被持有
若是设置了此选项,Libevent 将在调用你的回调时释放 bufferevent 的锁