libevent和基于libevent的网络编程

1 libevent介绍和安装

介绍

libevent是一个轻量级的基于事件驱动的高性能的开源网络库,而且支持多个平台,对多个平台的I/O复用技术进行了封装,当咱们编译库的代码时,编译的脚本将会根据OS支持的处理事件机制,来编译相应的代码,从而在libevent接口上保持一致。html

在当前的服务器上,面对的主要问题就是要能处理大量的链接。而经过libevent这个网络库,咱们就能够调用它的API来很好的解决上面的问题。首先,能够来回顾一下,对这个问题的传统解决方法。linux

问题: 如何处理多个客户端链接git

解决方案1:I/O复用技术github

这几种方式都是同步I/O,即当读写事件就绪,他们本身须要负责进行读写,这个读写过程是阻塞的,而异步I/O则不须要本身负责读写,只须要通知负责读写的程序就能够了。数据库

  • 循环
    假设当前我服务器有多个网络链接须要看管,那么我就循环遍历打开的网络链接的列表,来判断是否有要读取的数据。这种方法的缺点很明显,那就是 1.速度缓慢(必须遍历全部的网络链接) 2.效率低 (处理一个链接时可能发生阻塞,妨碍其余网络链接的检查和处理)编程

  • select方式
    select对应于内核中的sys_select调用,sys_select首先将第二三四个参数指向的fd_set拷贝到内核,而后对每一个被SET的描述符调用进行poll,并记录在临时结果中(fdset),若是有事件发生,select会将临时结果写到用户空间并返回;当轮询一遍后没有任何事件发生时,若是指定了超时时间,则select会睡眠到超时,睡眠结束后再进行一次轮询,并将临时结果写到用户空间,而后返回。
    select返回后,须要逐一检查关注的描述符是否被SET(事件是否发生)。(select支持的文件描述符数量过小了,默认是1024)。api

  • poll方式
    poll与select不一样,经过一个pollfd数组向内核传递须要关注的事件,故没有描述符个数的限制,pollfd中的events字段和revents分别用于标示关注的事件和发生的事件,故pollfd数组只须要被初始化一次。
    poll的实现机制与select相似,其对应内核中的sys_poll,只不过poll向内核传递pollfd数组,而后对pollfd中的每一个描述符进行poll,相比处理fdset来讲,poll效率更高。
    poll返回后,须要对pollfd中的每一个元素检查其revents值,来得指事件是否发生。数组

  • epoll方式
    epoll经过epoll_create建立一个用于epoll轮询的描述符,经过epoll_ctl添加/修改/删除事件,经过epoll_wait检查事件,epoll_wait的第二个参数用于存放结果。
    epoll与select、poll不一样,首先,其不用每次调用都向内核拷贝事件描述信息,在第一次调用后,事件信息就会与对应的epoll描述符关联起来。其次,epoll不是经过轮询,而是经过在等待的描述符上注册回调函数,当事件发生时,回调函数负责把发生的事件存储在就绪事件链表中,最后写到用户空间。
    epoll返回后,该参数指向的缓冲区中即为发生的事件,对缓冲区中每一个元素进行处理便可,而不须要像poll、select那样进行轮询检查。服务器

解决方案2:多线程技术或多进程技术网络

多线程技术和多进程技术也能够处理高并发的数据链接,由于在服务器中能够产生大量的进程和线程和处理咱们须要监视的链接。可是,这两种方式也是有很大的局限性的,好比多进程模型就不适合大量的短链接,由于进程的产生和关闭须要消耗较大的系统性能,一样,还要进程进程间的通讯,在CPU性能不足的状况下不太适合。而多线程技术则不太适合处理长链接,由于当咱们创建一个进程时,linux中会消耗8G的栈空间,若是咱们的每一个链接都杵着不断开,那么大量链接长链接后,致使的结果就是内存的大量消耗。

解决方案3:经常使用的上述两者复合使用
上述的两种方法各具备优缺点,所以,咱们能够将上述的方法结合起来,这也是目前使用较多的处理高并发的方法。多进程+I/O复用或者多线程+I/O复用。而在具体的实现上,又能够分为不少的方式。好比多线程+I/O复用技术,咱们使用使用一个主线程负责监听一个端口和接受的描述符是否有读写事件产生,若是有,则将事件分发给其余的工做进程去完成,这也是进程池的理念。

在说完上述的高并发的处理方法以后,咱们能够来介绍一个libevent的主要特点了。

一样,lievent也是采用的上述系统提供的select,poll和epoll方法来进行I/O复用,可是针对于多个系统平台上的不一样的I/O复用实现方式,libevent进行了从新的封装,并提供了统一的API接口。libevent在实现上使用了事件驱动这种机制,其本质上是一种Reactor模式。

Reactor模式,是一种事件驱动机制。应用程序须要提供相应的接口并注册到Reactor上,若是相应的事件发生,Reactor将主动调用应用程序注册的接口,这些接口又称为“回调函数”。

在Libevent中也是同样,向Libevent框架注册相应的事件和回调函数;当这些事件发生时,Libevent会调用这些回调函数处理相应的事件。

lbevent的事件支持三种,分别是网络IO、定时器和信号。定时器的数据结构使用最小堆(Min Heap),以提升效率。网络IO和信号的数据结构采用了双向链表(TAILQ)。

安装

libevent的安装很简单,我是直接从github上clone下一个源码,而后进行编译安装的。

具体的命令是(假设你已经安装了git):

# git clone https://github.com/nmathewson/Libevent.git
  # cd Libevent
  # sh autogen.sh
  # ./configure && make
  # make install
  # make verify  //验证安装

2 Linux下libevent主要API介绍

如今的libevent版本已经到达libevent2了,其增长了多线程的支持,API函数也发生了一些微小的变化。

  • 建立事件集

    struct event_base *event_base_new(void)

  • 建立事件

    struct event event_new(struct event_base ,evutil_socket_t ,short ,event_callback_fn,void*)

    参数一:事件所在的事件集。
    参数二:socket的描述符。
    参数三:事件类型,其中EV_READ表示等待读事件发生,EV_WRITE表示写事件发生,或者它俩的组合,EV_SIGNAL表示须要等待事件的号码,如 果不包含上述的标志,就是超时事件或者手动激活的事件。
    参数四:事件发生时须要调用的回调函数。
    参数五:回调函数的参数值。

  • 添加事件和删除事件

    int event_add(struct event * ev,const struct timeval* timeout)

    参数一:须要添加的事件
    参数二:事件的最大等待事件,若是是NULL的话,就是永久等待

    int event_del(struct event *)
    参数一:须要删除的事件

  • 分配监听事件

    int event_base_dispatch(struct event_base * )

    参数一:须要监视的事件集

  • I/O buffer事件

    struct bufferevent* bufferevent_socket_new
    (struct event_base * base,evutil_socket_t fd,int options)

    参数一:须要添加到的时间集
    参数二:相关的文件描述符
    参数三:0或者是相应的BEV_OPT_*可选标志

    int bufferevent_enable(struct bufferevent * bev,short event)

    参数一:须要启用的bufferevent
    参数二:any combination of EV|READ | EV_WRITE

    int bufferevent_disable(struct bufferevent * bev,short event)

    参数说明:同上

    size_t bufferevent_read(struct bufferevent bev,void data,size_t size)

    参数一:读取的buffer_event事件
    参数二:存储数据的指针
    参数三:数据buffer的大小

    返回值:读取数据的字节数

    int bufferevent_write(struct bufferevent bev,const void data,size_t size)

    参数一:读取的buffer_event事件
    参数二:存储数据的指针
    参数三:要写入的数据的大小,字节数

若是你想知道更多的API使用状况,请点击这里

3.1 编程实例之聊天室服务器

下面,就基于libevent2编写一个聊天室服务器。

设计思想:首先建立一个套接字,进而建立一个事件对此端口进行监听,将所请求的用户组成一个队列,并监听全部的用户事件,当某个用户说话了,产生了读事件,就将该用户的发言发送给队列中的其余用户。

程序分析

须要包含的libevent函数头:

#include <event2/event.h>
#include <event2/event_struct.h>
#include <event2/bufferevent.h>
#include <event2/buffer.h>

建立一个client结构体,接受链接后存放数据:

struct client {
/* The clients socket. */
    int fd;

/* The bufferedevent for this client. */
struct bufferevent *buf_ev;
    struct bufferevent *buf_ev;
/*
 * This holds the pointers to the next and previous entries in
 * the tail queue.
 */
    TAILQ_ENTRY(client) entries;
};

先来看下mian函数的处理:

int
main(int argc, char **argv)
{
    int listen_fd;
    struct sockaddr_in listen_addr;
    struct event ev_accept;
    int reuseaddr_on;

    /* Initialize libevent. */
    evbase = event_base_new();

    /* Initialize the tailq. */
    TAILQ_INIT(&client_tailq_head);

    /* Create our listening socket. */
    listen_fd = socket(AF_INET, SOCK_STREAM, 0);
    if (listen_fd < 0)
        err(1, "listen failed");
    memset(&listen_addr, 0, sizeof(listen_addr));
    listen_addr.sin_family = AF_INET;
    listen_addr.sin_addr.s_addr = INADDR_ANY;
    listen_addr.sin_port = htons(SERVER_PORT);
    
    if (bind(listen_fd, (struct sockaddr *)&listen_addr,
    sizeof(listen_addr)) < 0)
    err(1, "bind failed");
    if (listen(listen_fd, 5) < 0)
    err(1, "listen failed");
    reuseaddr_on = 1;
    setsockopt(listen_fd, SOL_SOCKET, SO_REUSEADDR, &reuseaddr_on, 
    sizeof(reuseaddr_on));

    /* Set the socket to non-blocking, this is essential in event
     * based programming with libevent. */
    if (setnonblock(listen_fd) < 0)
    err(1, "failed to set server socket to non-blocking");

    /* We now have a listening socket, we create a read event to
    * be notified when a client connects. */
    event_assign(&ev_accept, evbase, listen_fd, EV_READ|EV_PERSIST, 
    on_accept, NULL);
    event_add(&ev_accept, NULL);

    /* Start the event loop. */
    event_base_dispatch(evbase);

    return 0;
}

首先,函数初始化了一个用户队列tailq,接着建立了一个socket套接字,并将套接字设定为非阻塞模式,接着对一个全局的evbase事件集合,注册了事件,事件源是listen_fd,回调函数是on_accept,事件发生的状况是EV_READ,并且标志EV_PESIST代表该事件一直存在,然后开启事件扫描循环event_base_dispatch(evbase)

再看一下回调函数on_accpet实现:

void
on_accept(int fd, short ev, void *arg)
{
    int client_fd;
    struct sockaddr_in client_addr;
    socklen_t client_len = sizeof(client_addr);
    struct client *client;

    client_fd = accept(fd, (struct sockaddr *)&client_addr, &client_len);
    if (client_fd < 0) {
        warn("accept failed");
        return;
    }

    /* Set the client socket to non-blocking mode. */
    if (setnonblock(client_fd) < 0)
        warn("failed to set client socket non-blocking");

    /* We've accepted a new client, create a client object. */
    client = calloc(1, sizeof(*client));
    if (client == NULL)
        err(1, "malloc failed");
    client->fd = client_fd;

    client->buf_ev = bufferevent_socket_new(evbase, client_fd, 0);
    bufferevent_setcb(client->buf_ev, buffered_on_read, NULL,
        buffered_on_error, client);

    /* We have to enable it before our callbacks will be
    * called. */
    bufferevent_enable(client->buf_ev, EV_READ);

    /* Add the new client to the tailq. */
    TAILQ_INSERT_TAIL(&client_tailq_head, client, entries);

    printf("Accepted connection from %s\n", 
        inet_ntoa(client_addr.sin_addr));
}

这个回调函数的做用很显然,就是接受了一个客户端的请求,并申请好了一个client信息,将须要的内容填写好,在填写中须要注意的是,又向上述的事件集evbase中注册了一个bufferevent事件client->buf_ev,并注册了回调函数buffered_on_read,buffered_on_error,这三个函数分别是当接受后的链接发生了读或者错误事件后的执行函数。接着,将用户的client结构放入了用户的队列tailq中去。

用户的buffer可读后的执行函数:

void
buffered_on_read(struct bufferevent *bev, void *arg)
{
    struct client *this_client = arg;
    struct client *client;
    uint8_t data[8192];
    size_t n;

    /* Read 8k at a time and send it to all connected clients. */
    for (;;) {
        n = bufferevent_read(bev, data, sizeof(data));
        if (n <= 0) {
            /* Done. */
            break;
        }
    
        /* Send data to all connected clients except for the
         * client that sent the data. */

        TAILQ_FOREACH(client, &client_tailq_head, entries) {
            if (client != this_client) {
                bufferevent_write(client->buf_ev, data,  n);
            }
        }

    }

}

执行函数的做用很明显,将libevent管理中的buffer数据读取出,存入本地的data数组内,而后对队列中的client进行检索,若是不是发数据的client,则将数据写入该client的buffer中,发送给该用户。这里注意的是须要反复读取buffer中的数据,防止一个读取并无读取干净,直到读取不到数据为止。

buffer出错处理函数和上述函数差很少,功能就是出错后,结束掉保存的client结构,详细就不说了。

程序源码: chat-server.c Makefile

编译的时候记得修改Makefile中Libevent文件夹的位置

3.2 编程实例之回显服务器(纯异步IO)

设计思想:所谓回显服务器就是将客户端发过来的数据再发回去,这里主要也就是说明libevent的纯IO复用实现。实现方法和上面的差很少,甚至能够说更加简单。

程序和上面的聊天服务器差很少,只是在buffer可读的事件函数中,不是将用户的数据发送给其余用户,而是直接发送给用户自己。

程序源码: libevent_echosrv_buffered.c Makefile

3.3 编程实例之回显服务器(多线程--per connection per thread)

设计思想:上面的方法单纯使用libevent的简单函数来实现服务,可是这里,咱们假设咱们须要处理的客户端不多,因而咱们可使用对于每一个链接咱们分配一个线程这样的方式来实现对用户的服务。这种方式简单有效,一对一服务,就算业务逻辑出现阻塞也不怕。

程序分析

首先定义了一些数据结构,worker数据结构定义的是一个工做者,它包含有一个工做线程,和结束标志,须要获取的工做队列,和创建链表须要的指针。job数据结构定义的是操做一个job的方法和对象,这回到程序中,实际上就是指的是事件发生后,封装好的client结构体和处理这个结构体的方法。workqueue数据结构指的是当前的工做队列中的工做者,以及工做队列中的待完成的工做,以及互斥锁和条件变量(由于多个工做进程须要访问这些资源)。

具体的流程就是,用一个主线程监听一个套接字,并将套接字接受到的链接accept,并建立一个client数据结构保存该链接的信息,在这个client结构中注册一个bufferevent事件,注册到client->evbase上(这时候这是向client中的evbase注册了一个事件尚未进行循环这个事件集)。

接着,当监听到某个client有bufferevent事件发生,主线程就把该client结构体和须要进行的工做方法包装成一个job结构,而后把这个job扔到workqueue上去,并通知各个工做者。然后,各个工做者开着的线程就被激活了,疯狂地去workqueue上去抢工做作,某个worker拿到工做后,就能够解包job,根据job的工做说明书(job_function)操做工做对象(client)了。这里,job的工做说明有是循环client中的client->evbase,因而这样线程就会一直去监视这个链接的状态,若是有数据就这会调用回调函数进行处理。同时,这个线程也就是阻塞在这里,这对这一个链接负责。

创建workqueue须要的结构体和函数有:

typedef struct worker {
    pthread_t thread;
    int terminate;
    struct workqueue *workqueue;
    struct worker *prev;
    struct worker *next;
} worker_t;

typedef struct job {
    void (*job_function)(struct job *job);
    void *user_data;
    struct job *prev;
    struct job *next;
} job_t;

typedef struct workqueue {
    struct worker *workers;
    struct job *waiting_jobs;
    pthread_mutex_t jobs_mutex;
    pthread_cond_t jobs_cond;
} workqueue_t;

int workqueue_init(workqueue_t *workqueue, int numWorkers);

void workqueue_shutdown(workqueue_t *workqueue);

void workqueue_add_job(workqueue_t *workqueue, job_t *job);

主线程的on_accept函数为:

void on_accept(evutil_socket_t fd, short ev, void *arg) {
    int client_fd;
    struct sockaddr_in client_addr;
    socklen_t client_len = sizeof(client_addr);
    workqueue_t *workqueue = (workqueue_t *)arg;
    client_t *client;
    job_t *job;

    client_fd = accept(fd, (struct sockaddr *)&client_addr, &client_len);
    if (client_fd < 0) {
        warn("accept failed");
        return;
    }

    /* Set the client socket to non-blocking mode. */
    if (evutil_make_socket_nonblocking(client_fd) < 0) 
    {
        warn("failed to set client socket to non-blocking");
        close(client_fd);
        return;
    }

    /* Create a client object. */
    if ((client = malloc(sizeof(*client))) == NULL) 
    {
        warn("failed to allocate memory for client state");
        close(client_fd);
        return;
    }
    memset(client, 0, sizeof(*client));
    client->fd = client_fd;

    /* Add any custom code anywhere from here to the end of this function
    * to initialize your application-specific attributes in the client struct.
    */

    if ((client->output_buffer = evbuffer_new()) == NULL) 
    {
        warn("client output buffer allocation failed");
        closeAndFreeClient(client);
        return;
    }

    if ((client->evbase = event_base_new()) == NULL) 
    {
        warn("client event_base creation failed");
        closeAndFreeClient(client);
        return;
    }

    client->buf_ev = bufferevent_socket_new(client->evbase, client_fd, BEV_OPT_CLOSE_ON_FREE);
    if ((client->buf_ev) == NULL) {
        warn("client bufferevent creation failed");
        closeAndFreeClient(client);
        return;
    }
    bufferevent_setcb(client->buf_ev, buffered_on_read, buffered_on_write,
                  buffered_on_error, client);

    /* We have to enable it before our callbacks will be
     * called. */
    bufferevent_enable(client->buf_ev, EV_READ);

    /* Create a job object and add it to the work queue. */
    if ((job = malloc(sizeof(*job))) == NULL) {
        warn("failed to allocate memory for job state");
        closeAndFreeClient(client);
        return;
    }
    job->job_function = server_job_function;
    job->user_data = client;

    workqueue_add_job(workqueue, job);
}

job中的工做指南为:

static void server_job_function(struct job *job) {
    client_t *client = (client_t *)job->user_data;
    //do my job
    event_base_dispatch(client->evbase);
    
    closeAndFreeClient(client);
    free(job);
}

程序源码: echoserver_threaded.c workqueue.c workqueue.h Makefile

3.4 编程实例之回显服务器(多线程--线程池+异步IO)

设计思想:假设咱们的用户不少,高并发,长链接,那么咱们仍是来用I/O复用和线程池实现吧,用一个控制线程经过I/O复用负责监听和分发事件,用一组线程池来进行处理事件,这样就能够灵活地将控制逻辑和业务逻辑分开了,见下述讲解。

程序分析
具体的流程和上面的差很少,用一个主线程监听一个套接字,并将套接字接受到的链接accept,并建立一个client数据结构保存该链接的信息,在这个client结构中注册一个bufferevent事件,可是这里,将事件注册到accept_evbase中,仍然用主线程进行监听。

而面对监听后出现的事件,将client和操做client的方法打包成一个job,放到上述的workqueue中去,让工做进程来完成。这样的操做和上述的差异在于上述方法将bufferevent注册到client中的evbase中,用工做线程监听,而本方法用主线程监听,工做线程负责处理监听产生的事件。

这要的差异在于两个函数 on_accept函数:

void on_accept(evutil_socket_t fd, short ev, void *arg) {
    int client_fd;
    struct sockaddr_in client_addr;
    socklen_t client_len = sizeof(client_addr);
    client_t *client;

    client_fd = accept(fd, (struct sockaddr *)&client_addr, &client_len);
    if (client_fd < 0) {
        warn("accept failed");
        return;
    }

    /* Set the client socket to non-blocking mode. */
    if (evutil_make_socket_nonblocking(client_fd) < 0) {
        warn("failed to set client socket to non-blocking");
        close(client_fd);
        return;
    }

    /* Create a client object. */
    if ((client = malloc(sizeof(*client))) == NULL) {
        warn("failed to allocate memory for client state");
        close(client_fd);
        return;
    }
    memset(client, 0, sizeof(*client));
    client->fd = client_fd;

    /* Add any custom code anywhere from here to the end of this function
     * to initialize your application-specific attributes in the client struct.
    */

    if ((client->output_buffer = evbuffer_new()) == NULL) {
        warn("client output buffer allocation failed");
        closeAndFreeClient(client);
        return;
    }
    //须要注意的是,这里注册到evbase_accept
    client->buf_ev = bufferevent_socket_new(evbase_accept, client_fd,BEV_OPT_CLOSE_ON_FREE);
    if ((client->buf_ev) == NULL) {
        warn("client bufferevent creation failed");
        closeAndFreeClient(client);
        return;
    }
    bufferevent_setcb(client->buf_ev, buffered_on_read, buffered_on_write,
                  buffered_on_error, client);

    /* We have to enable it before our callbacks will be
    * called. */
    bufferevent_enable(client->buf_ev, EV_READ);
}

在buffered_on_read中,提交job。

void buffered_on_read(struct bufferevent *bev, void *arg) 
{
    client_t *client = (client_t *)arg;
    job_t *job;

    /* Create a job object and add it to the work queue. */
    if ((job = malloc(sizeof(*job))) == NULL) {
        warn("failed to allocate memory for job state");
        closeAndFreeClient(client);
        return;
    }
    job->job_function = server_job_function;
    job->user_data = client;

    workqueue_add_job(&workqueue, job);
}

在job工做指南server_job_function中就能够作你工做该作的事儿了,根据发来的信息进行数据库处理,http返回等等。

程序源码: echoserver_threaded.c workqueue.c workqueue.h Makefile

4 参考文章

[1] http://www.ibm.com/developerworks/cn/aix/library/au-libev/
[2] http://wenku.baidu.com/link?url=RmSm9M9mc4buqB_j6BGou5GxgyAn14lif18UUsQ8gr7pClAKGJr3civ8-DM0Xrpv4MdVIajykzbg34ZbGjGEizL8fOYE-EOKAATZIV06qwa
[3] http://blog.csdn.net/mafuli007/article/details/7476014
[4] http://blog.csdn.net/sparkliang/article/details/4957667
[5] http://bbs.chinaunix.net/thread-4118501-1-1.html
[6] http://bbs.chinaunix.net/thread-3776236-1-1.html
[7] http://www.zhihu.com/question/20114168
[8] http://www.zhihu.com/question/21516827
[9] http://www.wangafu.net/~nickm/libevent-2.0/doxygen/html/
[10] Libevent中文参考手册

相关文章
相关标签/搜索