Libevent教程001: 简介与配置

本文内容大体翻译自 libevent-book, 但不是照本翻译. 成文时, libevent最新的稳定版为 2.1.8 stable. 即本文如无特殊说明, 全部描述均以 2.1.8 stable 版本为准.git

本文为系列文章的第一篇, 对应libevent-book的 chapter 0 + chapter 1 + R0 + R1程序员

0. 前提条件

这个文档是对libevent的介绍与指导, 阅读文档须要你具备如下的能力:github

  1. 你精通C语言
  2. 你至少了解Unix网络编程.
  3. 你会安装libevent
  4. 你大体知道libevent是干什么用的.

1. 基本概念: 阻塞/非阻塞/同步/异步/回调机制的讨论

这里首先要解释四个名词: 阻塞, 非阻塞, 同步, 异步. 它们都是修饰"接口"的形容词, 或者说的土一点, 它们都是修饰"函数"的形容词.redis

同步, 仍是异步, 是从"消息通讯"的视角去描述这个接口的行为. 而所谓的消息通讯, 你能够简单的把"函数"想象成一个淘宝客服, 把"调用方"想象成你本身. 调用函数的过程其实就是三步:编程

  1. "你"询问"淘宝客服"一个问题. 好比, "在吗?". 在这个场景中, 你就是"调用方", "淘宝客服"是函数, 而那句"在吗?", 则是函数参数, 你把函数参数传递给函数.
  2. "淘宝客服"进行后台处理. 这时淘宝客服接收到了你的询问消息, 若是他没有在忙, 那么他能够当即回复你. 若是他如今正在忙, 好比正在吃饭, 好比正在和老婆吵架, 好比淘宝客服须要先看一下你以前的行为记录, 而后再决定如何回复你.(好比他看到你正在浏览一双袜子,以为你在潜在的买家, 他决定回复你. 好比他看到你三天前下单买了一双袜子, 但袜子尚未发货, 他以为你有退货的风险, 从而决定不理你, 伪装不在.) 这个客服思考决断的过程, 就是函数内部进行处理运算的过程. 固然这个例子很简单, 有些牵强.
  3. 最终, 淘宝客服回复了你, "在的, 亲". 这里, 回复这个动做, 就是函数返回, 而"在的, 亲"这句话, 就是函数的返回值.

你从这个角度去看, 函数调用, 就是消息通讯的过程, 你发送消息给函数, 函数通过一番运算思考, 把结果再回发给你.小程序

所谓的同步, 异步, 指的是:windows

  1. 这个淘宝客服很老实, 对于每一个顾客发来的问题, 他都须要通过一番思考, 再进行答复. 这个函数很老实, 对于每一个函数调用, 都很老实的根据传入参数进行计算, 再返回结果. 也是是说, 在淘宝客服思考结束以前, 这个客服不会向你发送答复, 你也收不到答复. 也就是说, 在函数运算结束以前, 函数不会返回, 你也得不到返回值. 那么, 这个客服是同步的, 这个函数调用的过程是同步调用, 这个函数是同步的.
  2. 假如这个淘宝客服很不老实, 他装了一个自动答复小程序. 对于每一个询问的顾客, 都先自动回复一句"亲, 如今很忙哟, 客服MM可能过一会才能给你答复". 也就是说, 顾客在发出询问以后, 当即就能获得一个答复. 也就是说, 调用方在调用一个函数以后, 这个函数就当即返回了. 而真正的结果, 可能在过五分钟以后才会给你. 便是五分钟以后客服对你说"在的呢, 亲". 这样的函数, 就叫异步函数.

异步客服须要解决一个问题: 当真正的运算结果得出以后, 被调用的客服如何通知做为调用方的你, 取走答案. 在淘宝客户端上, 是经过手机的震动消息提醒, 是经过聊天框的红点.api

因此, 关于同步, 和异步, 这里作一个稍微正式一点的总结:数组

  1. 同步的过程: 调用方传参->函数运算->函数返回运算结果.
  2. 异步的过程: 调用方传参->函数说我知道了, 而后过了五分钟, 函数说我算出来了, 结果在这里, 你来取.

这里咱们着眼于消息的传递, 通信方式, 也就是站在函数的角度去看, 结果是如何传递给调用方的. 同步接口, 运算结果在"函数调用"这个场景下就返回给了调用方. 异步接口: 运算结果在"函数调用"这个场景以后的某个不定的时刻, 经过某种通知方式, 传递给调用方.安全

整个过程当中咱们忽略了一件事: 就是, 在函数执行运算的过程当中, 调用方在干什么. 也是是, 在淘宝客服心里思考如何回复你的时候, 你在干什么.

这就引出了阻塞与非阻塞:

  1. 阻塞: 在函数执行运算的过程当中, 当前线程什么也作不了. 在等待客服回复的过程当中, 你什么也不作, 就在那干等着, 直到他回复了你.
  2. 非阻塞: 在函数执行去处的过程当中, 当前线程能够去作其它事情. 在等待客服回复的过程当中, 你上了个厕所, 还顺便洗了个澡.

换句话说:

  1. 同步与异步, 描述的是 被调用的函数, 如何将结果返回给调用者
  2. 阻塞与非阻塞, 描述的是 调用方, 在获得结果以前能不能脱身

这是两个维度上的逻辑概念, 这两个维度互相有必定的干涉, 并非彻底正交的两个维度, 这样, 既然是两个维度, 那么就有四种组合.

  1. 同步, 且阻塞: 调用方发起调用直至获得结果以前, 都不能干其它事情. 被调函数接收到参数直到运算结束以前, 都不会返回.
  2. 同步, 非阻塞: 调用方发起调用直至获得结果以前这段时间, 能够作其它事情. 但被调函数接收到参数直到运算结束以前, 都不会返回. 很显然这个逻辑概念说得通, 但实际上是反常理的. 由于: 若是调用方在发起调用以后, 获得结果(函数返回)以前, 要去作其它事情, 那么就有一个隐含的前提条件: 调用方必须知道本次调用的耗时, 且被调方(函数)严格遵照这个时间约定. 一毫秒很少, 一毫秒很多. 这在代码的世界里是很难达到的.
  3. 异步, 且阻塞: 调用方发起调用直至获得结果以前, 都不能干其它事情. 被调用函数接收到参数以后当即返回, 但在随后的某个时间点才把运算结果传递给调用方. 以后调用方继续活动. 这个逻辑概念依然说得通, 可是很别扭. 这就至关于, 在你问淘宝客服问题的时候, 淘宝客服的自动回复机器人已经给你说了"客服很忙哟, 可能过一会才能答复你", 但你就是啥也不干, 非得等到客服答复你以后, 才去上厕所. 这种情景在代码世界里可能发生, 但彷佛很智障.
  4. 异步, 非阻塞: 调用方发起调用直至获得结果以前这段时间, 能够作其它事情. 被调函数接收到参数后当即返回, 但在以后的某一个时间点才把运算结果传递给调用方. 这提及来很绕口, 举个栗子, 仍是客服:

    1. 你拿出手机, 向客服发送消息, "在吗?". 而后把手机放桌子上, 转向上厕所去了.
    2. 客服收到你的消息, 机器人回复你"很差意思, 客服如今很忙, 但咱们会尽快答复你的, 亲!".
    3. 你上厕所回来了, 看手机没消息, 又去吃饭了.
    4. 客服开始处理你的消息, 终于开始给你真正的回复"亲, 2333号客服为您服务, 你有什么要了解的吗?".
    5. 你吃饭的过程当中, 手机震动, 你点开淘宝, 发现有了回复. 整个流程结束.

能够看到

  1. 阻塞方式下, 调用方老是能第一时间拿到调用结果. 由于在阻塞期间, 调用方啥也不干, 就等着函数返回结果. 非阻塞方式下, 调用方通常都是在函数返回告终果以后才去查看运算结果.
  2. 异步方式下, 被调用方能够推迟处理任务. 客服收到你的消息后能够先把饭吃完, 函数收到你的调用后并不必定当即就开始运算.
  3. 同步且阻塞, 双方都是杠精, 都是老实人. 理解起来比较天然.
  4. 异步非阻塞, 调用方不在意何时能获得运算结果. 被调用方不在意调用方着急不着急, 双方都是佛系青年. 理解起来也比较天然.

还有一个点要给你们介绍到, 就是回调函数. 在上面讲过, 异步调用, 须要函数以某种机制, 在运算结果得出以后, 将运算结果传递给调用方. 但回调函数又绕了一个弯.

假设没有回调函数机制, 异步流程就是:

  1. 顾客询问客服, "大家家有没有红色36D的胸罩啊? 我想给我老婆买一件, 我老婆的胸是36D的". 而后去上厕所去了
  2. 自动机器人向顾客回复"很忙哟, 请耐心等待"
  3. 客服开始处理顾客的询问. 去库房查货.
  4. 库房有货, 客服要想办法将这个信息送到顾客手中. 他经过淘宝客户端发表了答复, 淘宝客户端致使手机震动, 这个震动信号通知了顾客.
  5. 顾客在厕所正拉屎, 看到手机上的消息提醒, 思考了一分钟, 顾客下单购买了这个胸罩.

这个流程里顾客作了两件事:

  1. 询问客服"有没有36D的红色胸罩". 这是调用函数的行为
  2. 在获得确定的答复以后, 下单购买了这个胸罩. 这是获得函数返回的运算结果, 并根据运算结果进一步执行程序流程.(调用了另一个函数: 购买)

而淘宝客服只作了一件事:

  1. 查询库房里是否有货

而有了回调机制后, 异步流程就是这样的:

  1. 顾客询问客服, "大家家有没有红色36D的胸罩?". 而后顾客把手机交给秘书, 叮嘱道:"你盯着这个客服, 若是她说有, 你就下单买了, 地址写我家, 若是没有, 你就啥也不作". 而后顾客坐上了出差的飞机
  2. 自动机器人向顾客回复"很忙哟, 请耐心等待"
  3. 客服开始处理顾客的询问. 去库房查货.
  4. 库房有货, 客服要想办法将这个信息送到顾客手中. 他经过淘宝客户端发表了答复, 淘宝客户端致使手机震动, 这个震动信号通知了秘书.
  5. 秘书根据老板的指示, 下单购买了这个胸罩.

这个流程里, 顾客作了两件事:

  1. 询问客服"有没有36D的胸罩". 这是调用函数行为.
  2. 向秘书叮嘱. 这是向消息监控方注册回调函数的行为. 消息监控方负责接收函数的返回结果. 回调函数则是: "若是有, 就买给老板夫人, 若是没有, 就什么也不作"

淘宝客服只作了一件事:

  1. 查询库房里是否有货

而消息监控方, 也就是秘书, 作了一件事:

  1. 根据客服的答复选择不一样的行为. 即在函数调用结果得出以后, 调用回调函数.

这就是回调函数的一个生动的例子, 回调函数机制中有了一个调用结果监控方, 就是秘书, 这个角色承担着很是重要的职责: 便是在函数返回结果以后, 调用对应的回调函数. 回调机制通常都实如今异步调用框架之中, 对于写代码的人来讲是透明的, 它简化了调用方的职责与智力负担, 必定程度上抽象了代码逻辑, 简化了编程模型(注意: 是必定程度上!). 有了回调机制:

  1. 调用方没必要再去关心函数返回结果以及返回时机. 没必要经过轮询或其它方式去检查异步函数是否返回告终果.
  2. 调用方在调用时就向调用结果监控方注册合适的回调, 在调用函数那一刻, 将后续业务逻辑写在回调函数中, 只负责调用就好了. 代码越写越像状态机.

不过正所谓回调一时爽, 调试火葬厂. 写过JavaScript的同窗对这一点必定是深有体会. 当程序不能正确运行的时候, 调试很蛋疼. 异步框架自己因为函数返回时机不肯定, 调试就比较蛋疼, 再加上回调机制, 那真是火葬厂了. 特别是回调嵌套回调, 里面套个七八层的时候, 那真是把图灵从坟里挖出来也没用的绝望场景.

2. 异步IO与多路复用技术

咱们先来看一段经典的同步且阻塞的HTTP客户端程序:

#include <netinet/in.h>     // for socketaddr_in
#include <sys/socket.h>     // for socket functions
#include <netdb.h>          // for gethostbyname
#include <sys/errno.h>      // for errno

#include <unistd.h>
#include <string.h>
#include <stdio.h>

int main(int argc, char ** argv)
{
    const char query[] = "GET / HTTP/1.0\r\n"
                         "Host: www.baidu.com\r\n"
                         "\r\n";
    const char hostname[] = "www.baidu.com";

    struct sockaddr_in sin;
    struct hostent * h;
    const char * cp;
    int fd;
    ssize_t n_written, remaining;

    char buf[4096];

    /*
     * Look up the IP address for the hostname.
     * Watch out; this isn't threadsafe on most platforms.
     */
    h = gethostbyname(hostname);

    if(!h)
    {
        fprintf(stderr, "E: gethostbyname(%s) failed. ErrMsg: %s\n", hostname, hstrerror(h_errno));
        return -__LINE__;
    }

    if(h->h_addrtype != AF_INET)
    {
        fprintf(stderr, "E: gethostbyname(%s) returned an non AF_INET address.\n", hostname);
        return -__LINE__;
    }

    /*
     * Allocate a new socket
     */
    fd = socket(AF_INET, SOCK_STREAM, 0);
    if(fd < 0)
    {
        fprintf(stderr, "E: socket failed: %s\n", strerror(errno));
        return -__LINE__;
    }

    /*
     * Connect to the remote host
     */
    sin.sin_family = AF_INET;
    sin.sin_port = htons(80);
    sin.sin_addr = *((struct in_addr *)(h->h_addr));
    if(connect(fd, (struct sockaddr *)(&sin), sizeof(sin)) != 0)
    {
        fprintf(stderr, "E: connect to %s failed: %s\n", hostname, strerror(errno));
        close(fd);
        return -__LINE__;
    }

    /*
     * Write the query
     * XXX Can send succeed partially?
     */
    cp = query;
    remaining = strlen(query);
    while(remaining)
    {
        n_written = send(fd, cp, remaining, 0);
        if(n_written < 0)
        {
            fprintf(stderr, "E: send failed: %s\n", strerror(errno));
            close(fd);
            return -__LINE__;
        }

        remaining -= n_written;
        cp += n_written;
    }

    /*
     * Get an answer back
     */
    while(1)
    {
        ssize_t result = recv(fd, buf, sizeof(buf), 0);
        if(result == 0)
        {
            break;
        }
        else if(result < 0)
        {
            fprintf(stderr, "recv failed: %s\n", strerror(errno));
            close(fd);
            return -__LINE__;
        }

        fwrite(buf, 1, result, stdout);
    }

    close(fd);

    return 0;
}

在上面的示例代码里, 大部分有关网络与IO的函数调用, 都是阻塞式的. 好比gethostbyname, 在DNS解析成功域名以前是不返回的(或者解析失败了会返回失败), connect函数, 在与对端主机成功创建TCP链接以前是不返回的(或者链接失败), 再好比recvsend函数, 在成功操做, 或明确失败以前, 也是不返回的.

阻塞式IO确实比较土, 上面的程序编译运行的时候, 若是你网络情况很差, 可能会卡一两秒才会读到百度的首页, 这卡就是由于阻塞IO的缘故. 固然, 虽然比较土, 但像这样的场合, 使用阻塞IO是没什么问题的. 但假如你想写一个程序同时读取两个网站的首页的话, 就比较麻烦了: 由于你不知道哪一个网站会先响应你的请求.. 你能够写一些, 好比像下面这样的, 很土的代码:

char buf[4096];
int i, n;
while(i_still_want_to_read())
{
    for(i = 0; i < n_sockets; ++i)
    {
        n = recv(fd[i], buf, sizeof(buf), 0);

        if(n == 0)
        {
            handle_close(fd[i]);
        }
        else if(n < 0)
        {
            handle_error(fd[i], errno);
        }
        else
        {
            handle_input(fd[i], buf, n);
        }
    }
}

若是你的fd[]数组里有两个网站的链接, fd[0]接着百度, fd[1]接着hao123, 假如hao123正常响应了, 能够从fd[1]里读出数据了, 但百度的服务器被李老板炸了, 响应不了了, 这时, 上面的代码就会卡在i==0时循环里的n = recv(fd[0], buf, sizeof(buf), 0)这条语句中, 直到李老板把服务器修好. 这就很蛋疼.

固然, 你能够用多线程解决这个问题, 多数状况下, 你有一个问题, 你尝试使用多线程解决, 而后你多个了有问题.

上面是一个冷笑话, 多线程或多进程是一个解决方案, 一般状况下, 最简单的套路是使用一个线程或进程去创建TCP链接, 而后链接创建成功后, 为每一个链接建立独立的线程或进程来进行IO读写. 这样即便一个网站抽风了, 也只阻塞属于它本身的那个读写线程或进程, 不会影响到其它网站的响应.

下面是另一个例子程序, 这是一个服务端程序, 监听40173端口上的TCP链接请求, 而后把客户端发送的数据按ROT13法再回写给客户端, 一次处理一行数据. 这个程序使用Unix上的fork()函数为每一个客户端的链接建立一个独立的处理进程.

#include <netinet/in.h>     // for sockaddr_in
#include <sys/socket.h>     // for socket functions
#include <sys/errno.h>      // for errno

#include <stdio.h>
#include <string.h>
#include <unistd.h>
#include <stdlib.h>

#define MAX_LINE 16384

char rot13_char(char c)
{
    if(
        (c >= 'a' && c <= 'm') ||
        (c >= 'A' && c <= 'M')
    )
    {
        return c+13;
    }
    else if(
        (c >= 'n' && c <= 'z') ||
        (c >= 'N' && c <= 'Z')
    )
    {
        return c-13;
    }
    else
    {
        return c;
    }
}

void child(int fd)
{
    char outbuf[MAX_LINE + 1];  // extra byte for '\0'
    size_t outbuf_used = 0;
    ssize_t result;

    while(1)
    {
        char ch;
        result = recv(fd, &ch, 1, 0);

        if(result == 0)
        {
            break;
        }
        else if(result == -1)
        {
            perror("read");
            break;
        }

        if(outbuf_used < sizeof(outbuf))
        {
            outbuf[outbuf_used++] = rot13_char(ch);
        }

        if(ch == '\n')
        {
            send(fd, outbuf, outbuf_used, 0);
            outbuf_used = 0;
            continue;
        }
    }
}

void run(void)
{
    int listener;
    struct sockaddr_in sin;

    sin.sin_family = AF_INET;
    sin.sin_addr.s_addr = 0;
    sin.sin_port = htons(40713);

    listener = socket(AF_INET, SOCK_STREAM, 0);

    int one = 1;
    setsockopt(listener, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));

    if(bind(listener, (struct sockaddr *)(&sin), sizeof(sin)) < 0)
    {
        perror("bind");
        return;
    }

    if(listen(listener, 16) < 0)
    {
        perror("listen");
        return;
    }

    while(1)
    {
        struct sockaddr_storage ss;
        socklen_t slen = sizeof(ss);
        int fd = accept(listener, (struct sockaddr *)(&ss), &slen);

        if(fd < 0)
        {
            perror("accept");
        }
        else
        {
            if(fork() == 0)
            {
                child(fd);
                exit(0);
            }
        }
    }
}

int main(int argc, char ** argv)
{
    run();
    return 0;
}

你可使用下面的命令行, 经过netcat工具向本机的40713发送数据, 来试验一下上面的服务端代码:

printf "abcdefghijklmnopqrstuvwxyz\n" | nc -4 -w1 localhost 40713

多进程或多线程确实是一个还算是比较优雅的, 应对并发链接的解决方案. 这种解决方案的缺陷是: 进程或线程的建立是有开销的, 在某些平台上, 这个开销仍是比较大的. 这里优化的方案是使用线程, 并使用线程池策略. 若是你的机器须要处理上千上万的并发链接, 这就意味着你须要建立成千上万个线程, 想象一下, 服务器通常也就十几个核心, 64个不得了了, 若是有五千并发链接, 5000个线程排除轮64个核心的大米, 线程调度确定是个大开销.

这个时候咱们就须要了解一下非阻塞了, 经过下面的Unix调用, 能够将一个文件描述符设置为"非阻塞"的. 明确一下: "非阻塞"描述的是IO函数的行为, 将一个文件描述符设置为"非阻塞"的, 实际上是指, 在这个文件描述符上执行IO操做, 函数的行为会变成非阻塞的.

fcntl(fd, F_SETFL, O_NONBLOCK);

当这个文件描述符是socket的文件描述符时, 咱们通常也会直接称, "把一个socket设置为非阻塞". 将一个socket设置为非阻塞以后, 在对应的文件描述符上, 不管是执行网络编程相关的函数, 仍是执行IO相关的函数, 函数行为都会变成非阻塞的, 即函数在调用以后就当即返回: 要么当即返回成功, 要把当即告诉调用者: "暂时不可用, 请稍后再试"

有了非阻塞这种手段, 咱们就能够改写咱们的访问网页程序了: 咱们这时能够正确的处理同时下载两个网站的数据的需求了. 代码片段以下:

int i, n;
char buf[1024];

for(i = 0; i < n_sockets; ++i)
{
    fcntl(fd[i], F_SETFL, O_NONBLOCK);
}

while(i_still_want_to_read)
{
    for(int i = 0; i < n_sockets; ++i)
    {
        n = recv(fd[i], buf, sizeof(buf), 0);
        if(n == 0)
        {
            handle_close(fd[i]);        // peer was closed
        }
        else if(n < 0)
        {
            if(errno == EAGAIN)
            {
                // do nothing, the kernel didn't have any data for us to read
                // retry
            }
            else
            {
                handle_error(fd[i], errno);
            }
        }
        else
        {
            handle_input(fd[i], buf, n);    // read success
        }
    }
}

这样写确实解决了问题, 可是, 在对端网站尚未成功响应的那几百毫秒里, 这段代码将会疯狂的死循环, 会把你的一个核心占满. 这是一个很蛋疼的解决方案, 缘由是: 对于真正的数据什么时候到达, 咱们没法肯定, 只能开个死循环轮询.

旧式的改进方案是使用一个叫select()的系统调用函数. select()函数内部维护了三个集合:

  1. 有数据可供读取的文件描述符
  2. 能够进行写入操做的文件描述符
  3. 出现异常的文件描述符

select()函数在这三个集合有至少一个集合不为空的时候返回. 若是三个集合都为空, 那么select()函数将阻塞.

下面是使用select()改进后的代码片段:

fd_set readset;
int i, n;
char buf[1024];

while(i_still_want_to_read)
{
    int maxfd = -1;
    FD_ZERO(&readset);

    // add all of the interesting fds to readset
    for(i = 0; i < n_sockets; ++i)
    {
        if(fd[i] > maxfd)
        {
            maxfd = fd[i];
        }

        FD_SET(fd[i], &readset):
    }

    select(maxfd+1, &readset, NULL, NULL, NULL);

    for(int i = 0; i < n_sockets; ++i)
    {
        if(FDD_ISSET(fd[i], &readset))
        {
            n = recv(fd[i], &readset);

            if(n == 0)
            {
                handle_close(fd[i]);
            }
            else if(n < 0)
            {
                if(errno == EAGAIN)
                {
                    // the kernel didn't have any data for us to read
                }
                else
                {
                    handle_error(fd[i], errno);
                }
            }
            else
            {
                handle_input(fd[i], buf, n);
            }
        }
    }
}

使用select()改进了程序, 但select()蛋疼的地方在于: 它只告诉你, 三集合中有数据了, 可是: 哪一个fd可读, 哪一个fd可写, 哪一个fd有异常, 这些具体的信息, 它仍是没告诉你. 若是你的fd数量很少, OK, 上面的代码没什么问题, 但若是你持有着上千个并发链接, 那每次select()返回时, 你都须要把全部fd都轮一遍.

下面是使用select()调用对rot13服务端示例代码的重构

#include <netinet/in.h>     // for sockaddr_in
#include <sys/socket.h>     // for socket functions
#include <sys/errno.h>      // for errno
#include <fcntl.h>          // for fcntl
#include <sys/select.h>     // for select

#include <assert.h>
#include <stdio.h>
#include <string.h>
#include <unistd.h>
#include <stdlib.h>

#define MAX_LINE 16384

char rot13_char(char c)
{
    if(
        (c >= 'a' && c <= 'm') ||
        (c >= 'A' && c <= 'M')
    )
    {
        return c+13;
    }
    else if(
        (c >= 'n' && c <= 'z') ||
        (c >= 'N' && c <= 'Z')
    )
    {
        return c-13;
    }
    else
    {
        return c;
    }
}

struct fd_state{
    char buffer[MAX_LINE];
    size_t buffer_used;

    int writing;
    size_t n_written;
    size_t write_upto;
};

struct fd_state * alloc_fd_state(void)
{
    struct fd_state * state = malloc(sizeof(struct fd_state));
    if(!state)
    {
        return NULL;
    }
    
    state->buffer_used = state->n_written = state->writing = state->write_upto = 0;
    return state;
}

void free_fd_state(struct fd_state * state)
{
    free(state);
}

void make_nonblocking(int fd)
{
    fcntl(fd, F_SETFL, O_NONBLOCK);
}

int do_read(int fd, struct fd_state * state)
{
    char buf[1024];
    int i;
    ssize_t result;

    while(1)
    {
        result = recv(fd, buf, sizeof(buf), 0);

        if(result <= 0)
        {
            break;
        }

        for(int i = 0; i < result; ++i)
        {
            if(state->buffer_used < sizeof(state->buffer))
            {
                state->buffer[state->buffer_used++] = rot13_char(buf[i]);
            }

            if(buf[i] == '\n')
            {
                state->writing = 1;
                state->write_upto = state->buffer_used;
            }
        }
    }

    if(result == 0)
    {
        return 1;
    }
    else if(result < 0)
    {
        if(errno == EAGAIN)
        {
            return 0;
        }

        return -1;
    }

    return 0;
}

int do_write(int fd, struct fd_state * state)
{
    while(state->n_written < state->write_upto)
    {
        ssize_t result = send(fd, state->buffer + state->n_written, state->write_upto - state->n_written, 0);

        if(result < 0)
        {
            if(errno == EAGAIN)
            {
                return 0;
            }

            return -1;
        }

        assert(result != 0);

        state->n_written += result;
    }

    if(state->n_written == state->buffer_used)
    {
        state->n_written = state->write_upto = state->buffer_used = 0;
    }

    state->writing = 0;

    return 0;
}

void run(void)
{
    int listener;
    struct fd_state * state[FD_SETSIZE];
    struct sockaddr_in sin;
    int i, maxfd;
    fd_set readset, writeset, exset;

    sin.sin_family = AF_INET;
    sin.sin_addr.s_addr = 0;
    sin.sin_port = htons(40713);
    
    for(i = 0; i < FD_SETSIZE; ++i)
    {
        state[i] = NULL;
    }

    listener = socket(AF_INET, SOCK_STREAM, 0);
    make_nonblocking(listener);

    int one = 1;
    setsockopt(listener, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));

    if(bind(listener, (struct sockaddr *)(&sin), sizeof(sin)) < 0)
    {
        perror("bind");
        return;
    }

    if(listen(listener, 16) < 0)
    {
        perror("listen");
        return;
    }

    FD_ZERO(&readset);
    FD_ZERO(&writeset);
    FD_ZERO(&exset);

    while(1)
    {
        maxfd = listener;

        FD_ZERO(&readset);
        FD_ZERO(&writeset);
        FD_ZERO(&exset);

        FD_SET(listener, &readset);

        for(i = 0; i < FD_SETSIZE; ++i)
        {
            if(state[i])
            {
                if(i > maxfd)
                {
                    maxfd = i;
                }

                FD_SET(i, &readset);

                if(state[i]->writing)
                {
                    FD_SET(i, &writeset);
                }
            }
        }

        if(select(maxfd + 1, &readset, &writeset, &exset, NULL) < 0)
        {
            perror("select");
            return;
        }

        if(FD_ISSET(listener, &readset))
        {
            struct sockaddr_storage ss;
            socklen_t slen = sizeof(ss);

            int fd = accept(listener, (struct sockaddr *)(&ss), &slen);

            if(fd < 0)
            {
                perror("accept");
            }
            else if(fd > FD_SETSIZE)
            {
                close(fd);
            }
            else
            {
                make_nonblocking(fd);
                state[fd] = alloc_fd_state();
                assert(state[fd]);
            }
        }

        for(i = 0; i < maxfd + 1; ++i)
        {
            int r = 0;

            if(i == listener)
            {
                continue;
            }

            if(FD_ISSET(i, &readset))
            {
                r = do_read(i, state[i]);
            }

            if(r == 0 && FD_ISSET(i, &writeset))
            {
                r = do_write(i, state[i]);
            }

            if(r)
            {
                free_fd_state(state[i]);
                state[i] = NULL;
                close(i);
            }
        }
    }
}

int main(int argc, char ** argv)
{
    setvbuf(stdout, NULL, _IONBF, 0);

    run();

    return 0;
}

但这样还不够好: FD_SETSIZE是一个很大的值, 至少不小于1024. 当要监听的fd的值比较大的时候, 就很恶心, 遍历会遍历不少次. 对于非阻塞IO接口来说, select是一个很粗糙的解决方案, 这个系统调用提供的功能比较薄弱, 只能说是够用, 但接口确实太屎了, 很差用, 性能也堪优.

不一样的操做系统平台上提供了不少select的替代品, 它们都用于配套非阻塞IO接口来使单线程程序也有必定的并发能力. 这些替代品有poll(), epoll(), kqueue(), evports/dev/poll. 而且这些替代品的性能都比select()要好的多. 但比较蛋疼的是, 上面提到的全部接口, 几乎都不是跨平台的. epoll()是Linux独有的, kqueue()是BSD系列(包括OS X)独有的. evports/dev/poll是Solaris独有的. 是的, select()属于POSIX标准的一部分, 但就是性能捉急. 也就是说, 若是你写的程序想跨平台, 高性能, 你就得本身写一层抽象, 把不一样平台对于IO多路复用的底层统一块儿来: 这也就是Libevent干的事情.

libevent的低级API为IO多路复用提供了统一的接口, 其底层实如今不一样的操做系统平台上都是最高效的实现.

下面, 咱们将使用libevent对上面的程序进行重构. 注意: fd_sets不见了, 取而代之的是一个叫event_base的结构体.

/* For sockaddr_in */
#include <netinet/in.h>
/* For socket functions */
#include <sys/socket.h>
/* For fcntl */
#include <fcntl.h>

#include <event2/event.h>

#include <assert.h>
#include <unistd.h>
#include <string.h>
#include <stdlib.h>
#include <stdio.h>
#include <errno.h>

#define MAX_LINE 16384

void do_read(evutil_socket_t fd, short events, void *arg);
void do_write(evutil_socket_t fd, short events, void *arg);

char
rot13_char(char c)
{
    /* We don't want to use isalpha here; setting the locale would change
     * which characters are considered alphabetical. */
    if ((c >= 'a' && c <= 'm') || (c >= 'A' && c <= 'M'))
        return c + 13;
    else if ((c >= 'n' && c <= 'z') || (c >= 'N' && c <= 'Z'))
        return c - 13;
    else
        return c;
}

struct fd_state {
    char buffer[MAX_LINE];
    size_t buffer_used;

    size_t n_written;
    size_t write_upto;

    struct event *read_event;
    struct event *write_event;
};

struct fd_state *
alloc_fd_state(struct event_base *base, evutil_socket_t fd)
{
    struct fd_state *state = malloc(sizeof(struct fd_state));
    if (!state)
        return NULL;
    state->read_event = event_new(base, fd, EV_READ|EV_PERSIST, do_read, state);
    if (!state->read_event) {
        free(state);
        return NULL;
    }
    state->write_event =
        event_new(base, fd, EV_WRITE|EV_PERSIST, do_write, state);

    if (!state->write_event) {
        event_free(state->read_event);
        free(state);
        return NULL;
    }

    state->buffer_used = state->n_written = state->write_upto = 0;

    assert(state->write_event);
    return state;
}

void
free_fd_state(struct fd_state *state)
{
    event_free(state->read_event);
    event_free(state->write_event);
    free(state);
}

void
do_read(evutil_socket_t fd, short events, void *arg)
{
    struct fd_state *state = arg;
    char buf[1024];
    int i;
    ssize_t result;
    while (1) {
        assert(state->write_event);
        result = recv(fd, buf, sizeof(buf), 0);
        if (result <= 0)
            break;

        for (i=0; i < result; ++i)  {
            if (state->buffer_used < sizeof(state->buffer))
                state->buffer[state->buffer_used++] = rot13_char(buf[i]);
            if (buf[i] == '\n') {
                assert(state->write_event);
                event_add(state->write_event, NULL);
                state->write_upto = state->buffer_used;
            }
        }
    }

    if (result == 0) {
        free_fd_state(state);
    } else if (result < 0) {
        if (errno == EAGAIN) // XXXX use evutil macro
            return;
        perror("recv");
        free_fd_state(state);
    }
}

void
do_write(evutil_socket_t fd, short events, void *arg)
{
    struct fd_state *state = arg;

    while (state->n_written < state->write_upto) {
        ssize_t result = send(fd, state->buffer + state->n_written,
                              state->write_upto - state->n_written, 0);
        if (result < 0) {
            if (errno == EAGAIN) // XXX use evutil macro
                return;
            free_fd_state(state);
            return;
        }
        assert(result != 0);

        state->n_written += result;
    }

    if (state->n_written == state->buffer_used)
        state->n_written = state->write_upto = state->buffer_used = 1;

    event_del(state->write_event);
}

void
do_accept(evutil_socket_t listener, short event, void *arg)
{
    struct event_base *base = arg;
    struct sockaddr_storage ss;
    socklen_t slen = sizeof(ss);
    int fd = accept(listener, (struct sockaddr*)&ss, &slen);
    if (fd < 0) { // XXXX eagain??
        perror("accept");
    } else if (fd > FD_SETSIZE) {
        close(fd); // XXX replace all closes with EVUTIL_CLOSESOCKET */
    } else {
        struct fd_state *state;
        evutil_make_socket_nonblocking(fd);
        state = alloc_fd_state(base, fd);
        assert(state); /*XXX err*/
        assert(state->write_event);
        event_add(state->read_event, NULL);
    }
}

void
run(void)
{
    evutil_socket_t listener;
    struct sockaddr_in sin;
    struct event_base *base;
    struct event *listener_event;

    base = event_base_new();
    if (!base)
        return; /*XXXerr*/

    sin.sin_family = AF_INET;
    sin.sin_addr.s_addr = 0;
    sin.sin_port = htons(40713);

    listener = socket(AF_INET, SOCK_STREAM, 0);
    evutil_make_socket_nonblocking(listener);

#ifndef WIN32
    {
        int one = 1;
        setsockopt(listener, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));
    }
#endif

    if (bind(listener, (struct sockaddr*)&sin, sizeof(sin)) < 0) {
        perror("bind");
        return;
    }

    if (listen(listener, 16)<0) {
        perror("listen");
        return;
    }

    listener_event = event_new(base, listener, EV_READ|EV_PERSIST, do_accept, (void*)base);
    /*XXX check it */
    event_add(listener_event, NULL);

    event_base_dispatch(base);
}

int
main(int c, char **v)
{
    setvbuf(stdout, NULL, _IONBF, 0);

    run();
    return 0;
}

总之:

  1. 注意连接的时候加上 -levent
  2. 代码量没有减小, 逻辑也没有简化. libevent只是给你提供了一个通用的多路IO接口. 或者叫事件监听接口.
  3. evutil_socket_t类型的使用, 与evutil_make_socket_nonblocking()函数的使用, 均是为也跨平台兼容性. 使用这些类型名与工具函数, 使得在windows平台上代码也能跑起来.

如今, 你看, 异步IO+事件处理(或者叫多路IO复用), 是单线程单进程程序取得并发能力的最佳途径, 而libevent则是把多平台的IO多路复用库给你抽象统一成一层接口了. 这样代写的代码不须要改动, 就能够运行在多个平台上.

这样就有了三个问题:

  1. 若是个人代码须要跨平台, 或者只须要跨部分平台(好比我只考虑Linux和BSD用户, 彻底不考虑Windows平台), 我为何不本身把多路IO库作个简单的封装, 为何要使用libevent呢? 典型的就是Redis, 用了很薄的一层封装, 下面统一了epoll, kqueue, evport, select等. 为何, 我须要使用libevent呢?
  2. 若是将libevent做为一个黑盒去用, 不可避免的问题就是: 它的性能怎么样? 它封装了多个多路IO库, 在封装上是否有性能损失?
  3. 如今是个轮子都说本身解决了跨平台问题, 那么libevent在windows上表现怎么样? 它能兼容IOCP式多路IO库吗? 毕竟IOCP的设计思路和epoll``select``evport``kqueue等都不同.

答案在这里:

  1. 你没有任何理由非得使用libevent, redis就是一个很好的例子. libevent有很多功能, 但若是你只是跨小部分平台, 而且只关注在多路IO复用上, 那么真的没什么必要非得用libevent. 你彻底能够像redis那样, 用几百行简单的把多路IO库本身封装一下.
  2. 基本上这么讲吧: 你使用系统原生异步IO多路复用接口的性能是多少, 使用libevent就是多少. 说实施libevent里没太多的抽象, 接口也没有多么好用, 封闭很薄, 和你使用原生接口基本同样.
  3. libevent从版本2开始就能搞定windows了. 上面咱们使用的是libevent很底层的接口, 其设计思路是遵循*nix上的事件处理模型的, 典型的就是selectepoll: 当网络可读写时, 通知应用程序去读去写. 而windows上IOCP的设计思路是: 当网络可读可写时不通知应用程序, 而是先完成读与写, 再通知应用程序, 应用程序直接拿到的就是数据. 当在libevent 2提供的bufferevents系列接口中, 它将*nix平台下的设计, 改巴改巴改为了IOCP式的. 使用这个系列的接口不可避免的, 对*nix平台有性能损失(这和asio封装网络库是同样的作法), 但实话讲, IOCP式的设计确实对程序员更友好, 代码可读性高了很多.

总的来讲, 你应该在以下的场合使用libevent

  1. 代码须要跨多个平台, 甚至是windows
  2. 想在*nix平台上使用IOCP式的事件接口编程
  3. 你不想本身封装多个平台上的多路IO接口, 而且自认为, 就算本身作, 作的也确定没有libevent好. libevent是一个久经考验的很基础的库. 身经百战.

下面是使用bufferevents系列接口, 以IOCP式风格对以前例子代码的重构, 体验一下更人性的事件处理方式:

/* For sockaddr_in */
#include <netinet/in.h>
/* For socket functions */
#include <sys/socket.h>
/* For fcntl */
#include <fcntl.h>

#include <event2/event.h>
#include <event2/buffer.h>
#include <event2/bufferevent.h>

#include <assert.h>
#include <unistd.h>
#include <string.h>
#include <stdlib.h>
#include <stdio.h>
#include <errno.h>

#define MAX_LINE 16384

void do_read(evutil_socket_t fd, short events, void *arg);
void do_write(evutil_socket_t fd, short events, void *arg);

char
rot13_char(char c)
{
    /* We don't want to use isalpha here; setting the locale would change
     * which characters are considered alphabetical. */
    if ((c >= 'a' && c <= 'm') || (c >= 'A' && c <= 'M'))
        return c + 13;
    else if ((c >= 'n' && c <= 'z') || (c >= 'N' && c <= 'Z'))
        return c - 13;
    else
        return c;
}

void
readcb(struct bufferevent *bev, void *ctx)
{
    struct evbuffer *input, *output;
    char *line;
    size_t n;
    int i;
    input = bufferevent_get_input(bev);
    output = bufferevent_get_output(bev);

    while ((line = evbuffer_readln(input, &n, EVBUFFER_EOL_LF))) {
        for (i = 0; i < n; ++i)
            line[i] = rot13_char(line[i]);
        evbuffer_add(output, line, n);
        evbuffer_add(output, "\n", 1);
        free(line);
    }

    if (evbuffer_get_length(input) >= MAX_LINE) {
        /* Too long; just process what there is and go on so that the buffer
         * doesn't grow infinitely long. */
        char buf[1024];
        while (evbuffer_get_length(input)) {
            int n = evbuffer_remove(input, buf, sizeof(buf));
            for (i = 0; i < n; ++i)
                buf[i] = rot13_char(buf[i]);
            evbuffer_add(output, buf, n);
        }
        evbuffer_add(output, "\n", 1);
    }
}

void
errorcb(struct bufferevent *bev, short error, void *ctx)
{
    if (error & BEV_EVENT_EOF) {
        /* connection has been closed, do any clean up here */
        /* ... */
    } else if (error & BEV_EVENT_ERROR) {
        /* check errno to see what error occurred */
        /* ... */
    } else if (error & BEV_EVENT_TIMEOUT) {
        /* must be a timeout event handle, handle it */
        /* ... */
    }
    bufferevent_free(bev);
}

void
do_accept(evutil_socket_t listener, short event, void *arg)
{
    struct event_base *base = arg;
    struct sockaddr_storage ss;
    socklen_t slen = sizeof(ss);
    int fd = accept(listener, (struct sockaddr*)&ss, &slen);
    if (fd < 0) {
        perror("accept");
    } else if (fd > FD_SETSIZE) {
        close(fd);
    } else {
        struct bufferevent *bev;
        evutil_make_socket_nonblocking(fd);
        bev = bufferevent_socket_new(base, fd, BEV_OPT_CLOSE_ON_FREE);
        bufferevent_setcb(bev, readcb, NULL, errorcb, NULL);
        bufferevent_setwatermark(bev, EV_READ, 0, MAX_LINE);
        bufferevent_enable(bev, EV_READ|EV_WRITE);
    }
}

void
run(void)
{
    evutil_socket_t listener;
    struct sockaddr_in sin;
    struct event_base *base;
    struct event *listener_event;

    base = event_base_new();
    if (!base)
        return; /*XXXerr*/

    sin.sin_family = AF_INET;
    sin.sin_addr.s_addr = 0;
    sin.sin_port = htons(40713);

    listener = socket(AF_INET, SOCK_STREAM, 0);
    evutil_make_socket_nonblocking(listener);

#ifndef WIN32
    {
        int one = 1;
        setsockopt(listener, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));
    }
#endif

    if (bind(listener, (struct sockaddr*)&sin, sizeof(sin)) < 0) {
        perror("bind");
        return;
    }

    if (listen(listener, 16)<0) {
        perror("listen");
        return;
    }

    listener_event = event_new(base, listener, EV_READ|EV_PERSIST, do_accept, (void*)base);
    /*XXX check it */
    event_add(listener_event, NULL);

    event_base_dispatch(base);
}

int
main(int c, char ** argv)
{
    setvbuf(stdout, NULL, _IONBF, 0);

    run();
    return 0;
}

说实话也没人性到哪里去, 底层库就是这样, libevent仍是太基础了. 算不上十分友好的轮子.

3. Libevent 简介

如今咱们要正式介绍Libevent

3.1 libevent的卖点

  1. 代码跨平台.
  2. 性能高. libevent在非阻塞IO+多路复用的底层实现上, 选取的是特定平台上最快的接口. 好比Linux上用epoll, BSD上用kqueue
  3. 高并发可扩展. libevent就是为了那种, 须要维持成千上万的活动socket链接的应用程序使用的.
  4. 接口友好. 虽然并无友好多少, 但至少比原生的epoll要好一点.

3.2 libevent下的各个子模块

  1. evutil 通用类型定义, 跨平台相关的通用定义, 以及一些通用小函数
  2. event and event_base 核心模块. 事件库. *nix风格的事件模型: 在socket可读可写时通知应用程序.
  3. bufferevent 对核心事件库的再一层封装, IOCP式的事件模型: 在数据已读已写后通知应用程序
  4. evbuffer 这是bufferevent模块内部使用的缓冲区实现.
  5. evhttp 简单的HTTP C/S实现
  6. evdns 简单的 DNS C/S实现
  7. evrpc 简单的 RPC实现

总的来讲, 做为使用者, 须要关心的是:

  1. evutil是须要关心的
  2. 对于主在*nix平台上写后台服务端程序的人: 只须要关心 event and event_base 核心库的用法便可.
  3. 对于跨平台, 特别是包含win平台的开发人员: 须要关注 buffereventevbuffer, 对于核心库event and event_base, 能够不关心
  4. evhttp, evdns, evrpc, 如无须要, 能够不用关心

3.3 libevent下的二进制库

如下是在连接你的代码的时候, 你须要了解的二进制库.

  1. libevent_core 包含event and event_base, evutil, evbuffer, bufferevent中的全部函数
  2. libevent_extra 包含协议相关的函数. 包括 HTTP/DNS/RPC 等. 若是你用不到 evhttp/evdns/evrpc里的函数, 那么这个库不用连接.
  3. libevent 满清遗老, 包含了上面两个库里的全部函数. 官方不建议在使用libevent 2.0以上的版本时连接这个库. 这是个懒人库.
  4. libevent_pthreads 若是你编写多线程应用程序. 那么这个库里包含了基于POSIX线程库的相关函数实现. 若是你没有用到libevent中有关的多线程函数, 那么这个库不用连接. 之前这些函数是划分在libevent_core中的, 后来被单独割出来了.注意: 这个库不是全平台的.
  5. libevent_openssl 这个库里的与OpenSSL相关的函数实现. 若是你没有用到libevent中有关OpenSSL的函数, 那么这个库不用连接. 之前这些函数也算在libevent_core中, 最后也割出来了. 注意: 这个库也不是全平台的

3.4 libevent中的头文件

libevent中的头文件分为三类, 全部头文件都位于event2目录下. 也就是说在代码中你应当这样写:

#include <event2/xxxx>
#include <event2/xxxx>
#include <event2/xxxx>
#include <event2/xxxx>

具体有哪些头文件在后续章节会详细介绍, 目前只介绍这个分类:

  1. API 头文件. 这些头文件定义了libevent对外的接口. 这些头文件没有特定前缀.
  2. 兼容性 头文件. 这些头文件是为了向前兼容老版本libevent存在的, 它们里面定义了老版本的一些废弃接口. 除非你是在作老代码迁移工做, 不然不建议使用这些头文件.
  3. 类型定义 头文件. 定义了libevent库中相关的类型. 这些头文件有共同后缀_struct.h

3.5 如何将老版本的代码迁移到libevent 2上

官方建议你们使用版本2, 但有时候这个世界就是不那么让人舒服, 若是你须要和版本1的历史代码打交道, 你能够参照下面的对照表: 老头文件与新头文件的对照表

旧头文件 新头文件
event.h event2/event*.h, event2/buffer*.h, event2/bufferevent*.h, event2/tag*.h
evdns.h event2/dns*.h
evhttp.h event2/http*.h
evrpc.h event2/rpc*.h
evutil.h event2/util*.h

在当前的2.0版本中, 老的旧头文件实际上是不须要替换的, 这些旧头文件依然存在. 但仍是建议将他们替换成新头文件, 由于说不定50年后libevent升级到3.0版本, 这些旧头文件就被扔了.

另外还有一些点须要你注意:

  1. 在1.4版本以前, 只有一个二进制库文件. libevent, 里面是libevent的全部实现. 现在这些实现被分割到了 libevent_corelibevent_extra两个库中.
  2. 在2.0以前, libevent不支持锁. 也就是说, 2.0以前若是要写出线程安全的代码, 你只能避免在线程间共享数据实例. 没有其它办法.

3.6 满清遗老

官方对待老版本是这样建议的:

  1. 1.4.7以前的版本被正式废弃了
  2. 1.3以前的版本有一堆bug, 用的时候看脸吧.
  3. 推荐使用2.0后的版本

我对老版本的态度是这样的: 能干活就好. 没有特殊缘由, 我是不会作代码迁移的. 而且考虑到应用场景, 有时候用老版本也挺好的.

1.4.x版本的libevent被大量项目使用, 其实挺稳定的, 官方不建议使用, 只是官方再也不在1.4版本上再加特性修bug了. 1.4版本最后的一个小版本号就中止在7上不动了. 而对于1.3版本, 确实不该该再碰了.

4. 使用libevent的正确姿式

libevent有几项全局设定, 若是你须要改动这几项设定, 那么确保在代码初始化的时候设定好值, 一旦你的代码流程开始了, 调用了第一个libevent中的任何函数, 后续强烈建议不要再更改设定值, 不然会引发不可预知的后果.

4.1 libevent中的日志

libevent默认状况下将把错误与警告日志写进stderr, 而且若是你须要一些libevent内部的调试日志的话, 也能够经过更改设定来让其输出调试日志, 以在程序崩溃时提供更多的参考信息. 这些行为均可以经过自行实现日志函数进行更改. 下面是libevent相关的日志接口.

// 如下是日志级别
#define EVENT_LOG_DEBUG 0
#define EVENT_LOG_MSG   1
#define EVENT_LOG_WARN  2
#define EVENT_LOG_ERR   3

// 如下是已经被废弃的日志级别定义
/* Deprecated; see note at the end of this section */
#define _EVENT_LOG_DEBUG EVENT_LOG_DEBUG
#define _EVENT_LOG_MSG   EVENT_LOG_MSG
#define _EVENT_LOG_WARN  EVENT_LOG_WARN
#define _EVENT_LOG_ERR   EVENT_LOG_ERR

// 这个是一个函数指针类型别名, 指向日志输出函数
// 日志输出函数应当是一个双参, 无返回值的函数, 第一个参数severity为日志级别, 第二个参数为日志字符串
typedef void (*event_log_cb)(int severity, const char *msg);

// 这是用户自定义设置日志处理函数的接口. 若是调用该函数时入参设置为NULL
// 则将采用默认行为
void event_set_log_callback(event_log_cb cb);

好比下面我须要改写libevent记录日志的方式:

#include <event2/event.h>
#include <stdio.h>

// 丢弃日志
static void discard_cb(int severity, const char * msg)
{
    // 这个日志函数内部什么也不作
}

// 将日志记录至文件
static FILE * logfile = NULL;
static void write_to_file_cb(int severity, const char * msg)
{
    const char * s;
    if(!logfile)
    {
        return;
    }

    switch(severity)
    {
        case EVENT_LOG_DEBUG:   s = "[DBG]";    break;
        case EVENT_LOG_MSG:     s = "[MSG]";    break;
        case EVENT_LOG_WARN:    s = "[WRN]";    break;
        case EVENT_LOG_ERR:     s = "[ERR]";    break;
        default:                s = "[???]";    break;
    }

    fprintf(logfile, "[%s][%s][%s] %s\n", __FILE__, __func__, s, msg);
}

void suppress_logging(void)
{
    event_set_log_callback(discard_cb);
}

void set_logfile(FILE * f)
{
    logfile = f;
    event_set_log_callback(write_to_file_cb);
}

注意: 在自定义的日志输出函数中, 不要调用其它libevent中的函数! 好比, 若是你要把日志远程输出至网络socket上去, 你还使用了bufferevent来输出你的日志, 在目前的libevent版本中, 这会致使一些奇怪的bug. libevent官方也认可这是一个设计上没有考虑到的点, 这可能在后续版本中被移除, 但截止目前的2.1.8 stable版本, 这个问题都尚未解决. 不要做死.

默认状况下的日志级别是EVENT_LOG_MSG, 也就是说EVENT_LOG_DEBUG级别的日志不会调用至日志输出函数. 要让libevent输出调试级别的日志, 请使用下面的接口:

#define EVENT_DBG_NONE 0
#define EVENT_DBG_ALL 0xffffffffu

// 若是传入 EVENT_DBG_NONE, 将保持默认状态: 不输出调试日志
// 若是传入 EVENT_DEG_ALL, 将开启调试日志的输出
void event_enable_debug_logging(ev_uint32_t which);

调试日志很详尽, 一般状况下对于libevent的使用者而言是没有输出的必要的. 由于要用到调试级别日志的场合, 是你百般无奈, 开始怀疑libevent自己有bug的时候. 虽然从宏的命名上, 仿佛还存在着 EVENT_DGB_SOMETHING 这样, 能够单独控制某个模块的调试日志输出的参数, 但实际上并无: 调试日志要么全开, 要么全关. 没有中间地带. 官方宣称可能在后续的版本中细化调试日志的控制.

而若是你要控制其它日志级别的输出与否, 请自行实现日志输出函数. 好比忽略掉EVENT_LOG_MSG级别的日志之类的. 上面的接口只是控制"若是产生了调试日志, libevent调用或不调用日志输出函数"而已.

上面有关日志的接口均定义在<event2/event.h>中.

  1. 日志输出函数相关接口早在版本1.0时就有了.
  2. event_enable_debug_logging()接口在2.1.1版本以后才有
  3. 日志级别宏名, 在2.0.19以前, 是如下划线开头的, 即_DEBUG_LOG_XXX, 但如今已经废弃掉了这种定义, 在新版本中请使用不带下划线开头的版本.

4.2 正确处理致命错误

当libevent检测到有致命的内部错误发生时(好比踩内存了之类的不可恢复的错误), 默认行为是调用exit()abort(). 出现这种状况99.99的缘由是使用者自身的代码出现了严重的bug, 另外0.01%的缘由是libevent自身有bug.

若是你但愿在进程退出以前作点额外的事情, 写几行带fxxk的日志之类的, libevent提供了相关的入口, 这能够改写libevent对待致命错误的默认行为.

typedef void (*event_fatal_cb)(int err);
void event_set_fatal_callback(event_fatal_cb cb);

注意, 不要试图强行恢复这种致命错误, 也就是说, 虽然libevent给你提供了这么个接口, 但不要在注册的函数中试图让进程继续执行. 由于这个时候libevent内部已经有坑了, 若是继续强行恢复, 结果是不可预知的. 换个说法: 这个函数应该提供的是临终遗言, 而不该该试图救死扶伤.

这个函数也定义在 <event2/event.h>中, 在2.0.3版本以后可用.

4.3 内存管理

默认状况下, libevent使用的是标准C库中的内存管理函数, 即malloc(), realloc(), free()等. libevent容许你使用其它的内存管理库, 好比tcmallocjemalloc. 相关接口以下:

void event_set_mem_functions(void *(*malloc_fn)(size_t sz),
                             void *(*realloc_fn)(void *ptr, size_t sz),
                             void (*free_fn)(void *ptr));

接口的第一个参数是内存分配函数指针, 第二个参数是内存重分配函数指针, 第三个参数是内存释放函数指针.

下面是一个使用的例子:

#include <event2/event.h>
#include <sys/types.h>
#include <stdlib.h>

union alignment
{
    size_t sz;
    void * ptr;
    double dbl;
};

#define ALIGNMENT sizeof(union alignment)

#define OUTPTR(ptr)     (((char *)ptr) + ALIGNMENT)
#define INPTR(ptr)      (((char *)ptr) - ALIGNMENT)

static size_t total_allocated = 0;

static void * my_malloc(size_t sz)
{
    void * chunk = malloc(sz + ALIGNMENT);
    if(!chunk)  return chunk;

    total_allocated += sz;

    *(size_t *)chunk = sz;

    return OUTPTR(chunk);
}

static void * my_realloc(void * ptr, size_t sz)
{
    size_t old_size = 0;

    if(ptr)
    {
        ptr = INPTR(ptr);
        old_size = *(size_t*)ptr;
    }

    ptr = realloc(ptr, sz + ALIGNMENT);

    if(!ptr)
    {
        return NULL;
    }

    *(size_t *)ptr = sz;

    total_allocated = total_allocated - old_size + sz;

    return OUTPTR(ptr);
}

static void my_free(void * ptr)
{
    ptr = INPTR(ptr);
    total_allocated -= *(size_t *)ptr;
    free(ptr);
}

void start_counting_bytes(void)
{
    event_set_mem_functions(
        my_malloc, my_realloc, my_free
    );
}

上面这个例子中, 提供了一种记录全局内存使用量的简单方案, 非线程安全.

对于自定义内存管理接口, 须要注意的有:

  1. 再次重申, 这是一个全局设定, 一旦设定, 后续全部的libevent函数内部的内存操做都会受影响. 而且不要在代码流程中途更改设定.
  2. 自定义的内存管理函数, 在分配内存时, 返回的指针后必须确保在至少sz个字节可用.
  3. 自定义的内存重分配函数, 必须正确处理realloc(NULL, sz)这种状况: 即, 使之行为等同于 malloc(sz). 也必须正确处理realloc(ptr, 0)这种状况: 即, 使之行为与free(ptr)相同且返回NULL.
  4. 自定义的内存释放函数, 必须正确处理 free(NULL): 什么也不作.
  5. 自定义的内在分配函数, 必须正确处理 malloc(0): 返回NULL.
  6. 若是你在多线程环境中使用libevent, 请务必确保内存分配函数是线程安全的.
  7. 若是你要释放一个由libevent建立来的内存区域, 请确认你使用的free()版本与libevent内部使用的内存管理函数是一致的. 也就是说: 若是要操做libevent相关的内存区域, 请确保相关的内存处理函数和libevent内部使用的内在管理函数是一致的. 或者简单一点: 若是你决定使用某个内存管理库, 那么在整个项目范围内都使用它, 这样最简单, 不容易出乱子. 不然应该尽力避免在外部操做libevent建立的内存区域.

event_set_mem_functions()接口也定义在<event2/event.h>中, 在2.0.2版本后可用.

须要注意的是: libevent在编译安装的时候, 能够关闭event_set_mem_functions()这个特性. 若是关闭了这个特性, 而在项目中又使用了这个特性, 那么在项目编译时, 编译将报错. 若是要检测当前引入的libevent库是否启用了这个功能, 能够经过检测宏EVENT_SET_MEM_FUNCTIONS_IMPLEMENTED宏是否被定义来判断.

4.4 线程与锁

多线程程序设计里的数据访问是个大难题. 目前的版本里, libevent支持了多线程编程, 但这个支持法呢, 怎么讲呢, 使用者仍是须要知道很多细节才能正确的写出多线程应用. libevent中的数据结构分为三类:

  1. 有一些数据结构就是非线程安全的. 这是历史遗留问题, libevent在大版本号更新为2后才支持多线程, 这些数据结构是从版本1一路继承下来的, 不要在多线程中共享这些实例. 没办法.
  2. 有一些数据结构的实例能够用锁保护起来, 以在多线程环境中共享. 若是你须要在多个线程中访问某个实例, 那么你须要给libevent说明这个状况, 而后libevent会为这个实例加上适当的锁保护, 以确保你在多线程访问它时是安全的. 加锁不须要你去加, 你须要作的只是告诉libevent一声, 如何具体操做后面再讲.
  3. 有些数据结构, 天生就是带锁的. 若是你带 libevent_pthreads 库连接你的程序, 那么这些结构的实例在多线程环境中必定的安全的. 你想让它不安全都没办法.

虽然libevent为你写了一些加锁解锁的无聊代码, 你没必要要手动为每一个对象加锁了, 但libevent仍是须要你指定加锁的函数. 就像你能够为libevent指定其它的内存管理库同样. 注意这也是一个全局设定, 请遵循咱们一再强调的使用规则: 进程初始化时就定好, 后续不准再更改.

若是你使用的是POSIX线程库, 或者标准的windows原生线程库, 那么简单了一些. 设置加解锁函数只须要一行函数调用, 接口以下:

#ifdef WIN32
int evthread_use_windows_threads(void);
#define EVTHREAD_USE_WINDOWS_THREADS_IMPLEMENTED
#endif

#ifdef _EVENT_HAVE_PTHREADS
int evthread_use_pthreads();
#define EVTHREAD_USE_PTHREADS_IMPLEMENTED
#endif

这两个函数在成功时都返回0, 失败时返回-1.

这只是加解锁. 但若是你想要自定义的是整个线程库, 那么你就须要手动指定以下的函数与结构定义

  1. 锁的定义
  2. 加锁函数
  3. 解锁函数
  4. 锁分配函数
  5. 锁释放函数
  6. 条件变量定义
  7. 条件变量建立函数
  8. 条件变量释放函数
  9. 条件变量等待函数
  10. 通知/广播条件变量的函数
  11. 线程定义
  12. 线程ID检测函数

这里须要注意的是: libevent并不会为你写哪怕一行的多线程代码, libevent内部也不会去建立线程. 你要使用多线程, OK, 你用哪一种线程库都行, 没问题. 但你须要将配套的锁/条件变量/线程检测函数以及相关定义告诉libevent, 这样libevent才会知道如何在多线程环境中保护本身的实例, 以供你在多线程环境中安全的访问.

  1. 若是你使用的是POSIX线程或者windows原生线程库, 就方便了一点, 调一行函数的事情.
  2. 若是你在使用POSIX纯种或windows原生线程库时, 你不想使用POSIX配套的锁, 那OK, 你在调用完evthread_use_xxx_threads()以后, 把你本身的锁函数或者条件变量函数提供给libevent就行了. 注意这种状况下, 在你的程序的其它地方也须要使用你指定的锁或条件变量.
  3. 而若是你使用的是其它线程库, 也OK, 只不过麻烦一点, 要提供锁的相关信息, 要提供条件变量的相关信息, 也要提供线程ID检测函数

下面是相关的接口

// 锁模式是 lock 与 unlock 函数的参数, 它指定了加锁解锁时的一些额外信息
// 若是调用 lock 或 unlock 时的锁都不知足下面的三种模式, 参数传0便可
#define EVTHREAD_WRITE  0x04        // 锁模式: 仅对读写锁使用: 获取或释放写锁
#define EVTHREAD_READ   0x08        // 锁模式: 仅对读写锁使用: 获取或释放读锁
#define EVTHREAD_TRY    0x10        // 锁模式: 仅在加锁时使用: 仅在能够当即加锁的时候才去加锁. 
                                    //         若当前不可加锁, 则lock函数当即返回失败, 而不是阻塞

// 锁类型是 alloc 与 free 函数的参数, 它指定了建立与销毁的锁的类型
// 锁类型能够是 EVTHREAD_LOCKTYPE_XXX 之一或者为0
// 全部支持的锁类型均须要被登记在 supported_locktypes 中, 若是支持多种锁, 则多个宏之间用 | 连结构成该字段的值
                                            // 当锁类型为0时, 指的是普通的, 非递归锁
#define EVTHREAD_LOCKTYPE_RECURSIVE 1       // 锁类型: 递归锁, 你必须提供一种递归锁给libevent使用
#define EVTHREAD_LOCKTYPE_READWRITE 2       // 锁类型: 读写锁, 在2.0.4版本以前, libevent内部没有使用到读写锁

#define EVTHREAD_LOCK_API_VERSION 1

// 将你要用的有关锁的全部信息放在这个结构里
struct evthread_lock_callbacks {
       int lock_api_version;                // 必须与宏 EVTHREAD_LOCK_API_VERSION的值一致
       unsigned supported_locktypes;        // 必须是宏 EVTHREAD_LOCKTYPE_XXX 的或组合, 或为0
       void *(*alloc)(unsigned locktype);               // 锁分配, 须要指定锁类型
       void (*free)(void *lock, unsigned locktype);     // 锁销毁, 须要指定锁类型
       int (*lock)(unsigned mode, void *lock);          // 加锁, 须要指定锁模式
       int (*unlock)(unsigned mode, void *lock);        // 解锁, 须要指定锁模式
};

// 调用该函数以设置相关锁函数
int evthread_set_lock_callbacks(const struct evthread_lock_callbacks *);

// 调该函数以设置线程ID检测函数
void evthread_set_id_callback(unsigned long (*id_fn)(void));

// 将你要用的有关条件变量的全部信息都放在这个结构里
struct evthread_condition_callbacks {
        int condition_api_version;
        void *(*alloc_condition)(unsigned condtype);
        void (*free_condition)(void *cond);
        int (*signal_condition)(void *cond, int broadcast);
        int (*wait_condition)(void *cond, void *lock,
            const struct timeval *timeout);
};

// 经过该函数以设置相关的条件变量函数
int evthread_set_condition_callbacks(
        const struct evthread_condition_callbacks *);

要探究具体如何使用这些函数, 请看libevent源代码中的evthread_pthread.cevthread_win32.c文件.

对于大多数普通用户来讲, 只须要调用一下evthread_use_windows_threads()evthread_use_pthreads()就好了.

上面这些函数均定义在 <event2/thread.h>中. 在2.0.4版本后这些函数才可用. 2.0.1至2.0.3版本中使用了一些旧接口, event_use_pthreads()等. 有关条件变量的相关接口直至2.0.7版本才可用, 引入条件变量是为了解决以前libevent出现的死锁问题.

libevent自己能够被编译成不支持锁的二进制库, 用这种二进制库连接你的多线程代码, bomshakalaka, 跑不起来. 这算是个无用知识点.

另外额外注意: 多线程程序, 而且还使用了POSIX线程库和配套的锁, 那么你须要连接libevent_pthreads. windows平台则不用.

4.5 小知识点: 有关锁的调试

libevent有一个额外的特性叫"锁调试", 开启这种特性后, libevent将把它内部有关锁的全部调用都再包装一层, 以检测/获取在锁调用过程当中出现的错误, 好比:

  1. 解了一个没有持有的锁
  2. 对一个非递归锁进行了二次加锁

若是出现了上述错误, 则libevent会致使进程退出, 并附送一个断言错误

要开启这个特性, 调用下面的接口:

void evthread_enable_lock_debugging(void);
#define evthread_enable_lock_debuging() evthread_enable_lock_debugging()

注意, 这也是一个全局设置项, 请遵循: 一次设置, 初始化时就设置, 永不改动的规则.

这个特性在2.0.4版本中开始支持, 当时接口函数名拼写错误了, 少写了一个g: evthread_enable_lock_debuging(). 后来在2.1.2版本中把这个错误的拼写修正过来了. 但仍是兼容了以前的错误拼写.

这个特性吧, 很明显是libevent内部开发时使用的. 如今开放出来估计是考虑到, 若是你的代码中出现了一个bug是由libevent内部加解锁失误致使的, 那么用个特性能够定位到libevent内部. 不然你很难把锅甩给libevent. 固然这种状况不多见.

4.6 小知识点: 排除不正确的使用姿式

libevent是一个比较薄的库, 薄的好处是性能很好, 坏处是没有在接口上对使用者作过多的约束. 这就致使一些二把刀使用者常常会错误的使用libevent. 常见的智障行为有:

  1. 向相关接口传递了一个未初始化的事件结构实例
  2. 试图第二次初始化一个正在被使用的事件结构实例

这种错误其实挺难发现的, 为了解决这个痛点, libevent额外开发了一个新特性: 在发生上述状况的时候, libevent给你报错.

但这是一个会额外消耗资源的特性, libevent内部实际上是追踪了每一个事件结构的初始化与销毁, 因此仅在开发测试的时候打开它, 发现问题, 解决问题. 在实际部署的时候, 不要使用这个特性. 开启这个特性的接口以下:

void event_enable_debug_mode(void);

再不厌其烦的讲一遍: 全局设定, 初始化时设定, 一次设定, 永不更改.

这个特性开启后, 也有一个比较蛋疼的事情: 就是若是你的代码里大量使用了event_assign()来建立事件结构, 可能你的程序在这个特性下会OOM挂掉..缘由是: libevent能够经过对event_new()event_free()的追踪来检测事件结构实例是否未被初始化, 或者被屡次初始化, 或者被非法使用. 可是对于event_assign()拷贝来的事件结构, 这追踪就无能为力了, 而且蛋疼的是event_assign()仍是浅拷贝. 这样, 若是你的代码里大量的使用了event_assign(), 这就会致使内置的的追踪功能一旦追上车就下不来了, 完事车太多就OOM挂掉了.

为了不在这个特性下因为追踪event_assign()建立的事件实例(或许这里叫实例已经不合适了, 应该叫句柄)而致使程序OOM, 能够调用下面的函数以解除对这种事件实例的追踪, 以免OOM

void event_debug_unassign(struct event * ev);

这样, 调试模式下, 相关的追踪检测就会放弃追踪由event_assign建立的事件. 因此你看, 这个特性也不是万能的, 有缺陷, 凑合用吧. 在不开启调试模式下, 调用event_debug_unassign()函数没有任何影响

下面是一个例子:

#include <event2/event.h>
#include <event2/event_struct.h>

#include <stdlib.h>

void cb(evutil_socket_t fd, short what, void *ptr)
{
    struct event *ev = ptr;

    if (ev)     // 经过判断入参是否为NULL, 来确认入参携带的事件实例是event_new来的仍是event_assign来的
        event_debug_unassign(ev); // 若是是event_assign来的, 那么就放弃对它的追踪
}

/*
 * 下面是一个简单的循环, 等待fd1与fd2同时可读
 */
void mainloop(evutil_socket_t fd1, evutil_socket_t fd2, int debug_mode)
{
    struct event_base *base;
    struct event event_on_stack, *event_on_heap;        // 一个是栈上的事件实例, 一个是堆上的事件实例

    if (debug_mode)
       event_enable_debug_mode();       // 开启调试模式

    base = event_base_new();

    event_on_heap = event_new(base, fd1, EV_READ, cb, NULL);    // 经过event_new来建立堆上的实例, 并把事件回调的入参设置为NULL
    event_assign(&event_on_stack, base, fd2, EV_READ, cb, &event_on_stack); // 经过event_assign来初始化栈上的实例, 并把事件回调的入参设置为事件实例自身的指针

    event_add(event_on_heap, NULL);
    event_add(&event_on_stack, NULL);

    event_base_dispatch(base);

    event_free(event_on_heap);
    event_base_free(base);
}

这个例子也写的比较蛋疼, 凑合看吧.

另外, 调试模式下的详情调试信息, 只能经过在编译时额外定义宏USE_DEBUG来附加. 即在编译时加上-DUSE_DEBUG来开启. 加上这个编译时的宏定义后, libevent就会输出一大坨有关其内部流程的详情日志, 包括但不限于

  1. 事件的增长
  2. 事件的删除
  3. 与具体平台相关的事件通知信息

这些详情不能经过调用API的方式开启或关闭. 而开启调试模式的API, 在2.0.4版本后才可用.

4.7 检测当前项目中引用的libevent的版本

接口很简单, 以下:

#define LIBEVENT_VERSION_NUMBER 0x02000300
#define LIBEVENT_VERSION "2.0.3-alpha"
const char *event_get_version(void);        // 获取字符串形式的版本信息
ev_uint32_t event_get_version_number(void); // 获取值形式的版本信息

值形式的版本信息由一个uint32_t类型存储, 从高位到低位, 每8位表明一个版本号. 好比 0x02000300表明的版本号就是02.00.03.00. 三级版本号后可能还有一个小版本号, 好比就存在过一个2.0.1.18的版本

下面是一个在编译期检查libevent版本的写法, 若版本小于2.0.1, 则编译不经过. 须要注意的是, 编译期检查的是宏里的值, 若是你的项目构建比较混乱, 极可能出现头文件的版本, 和最终连接的二进制库的版本不一致的状况. 因此编译期检查也不必定靠谱

#include <event2/event.h>

#if !defined(LIBEVENT_VERSION_NUMBER) || LIBEVENT_VERSION_NUMBER < 0x02000100
#error "This version of Libevent is not supported; Get 2.0.1-alpha or later."
#endif

int
make_sandwich(void)
{
        /* Let's suppose that Libevent 6.0.5 introduces a make-me-a
           sandwich function. */
#if LIBEVENT_VERSION_NUMBER >= 0x06000500
        evutil_make_me_a_sandwich();
        return 0;
#else
        return -1;
#endif
}

下面是一个在运行时检查libdvent版本的写法. 检查运行期的版本是经过函数调用检查的, 这就保证了返回的版本号必定是连接进的库的版本号. 这个比较靠谱. 另外须要注意的是, 数值形式的版本号在libevent2.0.1以后才提供. 因此只能比较蠢的用比较字符串的方式去判断版本号

#include <event2/event.h>
#include <string.h>

int
check_for_old_version(void)
{
    const char *v = event_get_version();
    /* This is a dumb way to do it, but it is the only thing that works
       before Libevent 2.0. */
    if (!strncmp(v, "0.", 2) ||
        !strncmp(v, "1.1", 3) ||
        !strncmp(v, "1.2", 3) ||
        !strncmp(v, "1.3", 3)) {

        printf("Your version of Libevent is very old.  If you run into bugs,"
               " consider upgrading.\n");
        return -1;
    } else {
        printf("Running with Libevent version %s\n", v);
        return 0;
    }
}

int
check_version_match(void)
{
    ev_uint32_t v_compile, v_run;
    v_compile = LIBEVENT_VERSION_NUMBER;
    v_run = event_get_version_number();
    if ((v_compile & 0xffff0000) != (v_run & 0xffff0000)) {
        printf("Running with a Libevent version (%s) very different from the "
               "one we were built with (%s).\n", event_get_version(),
               LIBEVENT_VERSION);
        return -1;
    }
    return 0;
}

接口和宏的定义位于 <event2/event.h>中, 字符串形式的版本号在1.0版本就提供了, 数值形式的版本号直至2.0.1才提供

4.8 一键释放全部全局实例

就算你手动释放了全部在程序代码初始化时建立的libevent对象, 在程序退出以前, 也依然有一些内置的, 对使用者不可见的libevent内部实例以及一些全局配置实例存在着, 而且存在在堆区. 通常状况下不用管它们: 程序都退出了, 释放不释放有什么区别呢? 反正操做系统会帮你清除的. 但有时你想引入一些第三方的分析工具, 好比检测内存泄漏的工具时, 就会致使这些工具误报内存泄漏.

你能够简单的调一下下面这个函数, 完成一键彻底清除:

void libevent_global_shutdown(void);

注意哦: 这个函数不会帮你释放你本身调用libevent接口建立出来的对象哦! 还没那么智能哦!

另外, 很显然的一点是, 当调用了这个函数以后, 再去调用其它libevent接口, 可能会出现异常哦! 因此没事不要调用它, 若是你调用它, 那么必定是自杀前的最后一秒.

函数定义在<event2/event.h>中, 2.1.1版本后可用

相关文章
相关标签/搜索