开源项目SMSS开发指南(二)——基于libevent的线程池

libevent是一套轻量级的网络库,基于事件驱动开发。可以实现多线程的多路复用和注册事件响应。本文将介绍libevent的基本功能以及如何利用libevent开发一个线程池。

一. 使用指南

监听服务和注册链接事件

libevent是一个基于事件驱动的网络库,经过在一个事件循环上注册不一样的事件以完成线程多路复用。因为libevent采用c语言开发,为了使用方便咱们能够将它的功能经过面向对象的设计模式用c++来封装。下面是对经常使用函数的详细介绍:html

(1)event_base_new():建立(初始化)event_baselinux

event_base表明了一个事件循环上下文,全部须要基于这个事件循环的事件都须要注册在它的上面。若是建立成功,最后须要使用event_base_free()来释放资源。c++

(2)evconnlistener_new_bind():绑定一个本地端口并注册网络监听事件git

参数说明:设计模式

  • struct event_base* base 前文建立好的base,事件将关联到这个事件循环上
  • evconnlistener_cb cb 事件触发的回调
  • void *ptr 回调函数的参数,这个参数能够由用户任意指定,方便在回调函数中使用
  • unsigned flags 事件的附加标识,表明事件操做
  • int backlog 网络缓存大小
  • const struct sockaddr *sa socket地址
  • int socklen socket地址长度

函数会返回一个新的evconnlistener,若是建立成功,须要使用evconnlistener_free()来释放资源。缓存

(3)event_base_dispatch():启动事件循环和事件分发网络

这个函数会阻塞当前线程,用户能够在事件回调函数中经过event_base_loopbreak()来中断。若是不但愿当前线程被堵塞也可使用event_base_loop()函数。注意,千万不要在回调函数中清理event_base。多线程

代码示例:负载均衡

// 建立事件循环
ev_base_ = event_base_new();
if (!ev_base_)
{
    return false;
}
sockaddr_in sin;
memset(&sin, 0, sizeof(sin));
sin.sin_family = AF_INET;
sin.sin_port = htons(port_);
// 建立socket链接回调
ev_listener_ = evconnlistener_new_bind(
    ev_base_,
    SConnListenerCb,
    this,
    LEV_OPT_REUSEABLE | LEV_OPT_CLOSE_ON_FREE,
    this->backlog_,
    (sockaddr *)&sin,
    sizeof(sin));
if (!ev_listener_)
{
    return false;
}
while (!quit_)
{
    event_base_loop(ev_base_, EVLOOP_NONBLOCK);
    this_thread::sleep_for(chrono::milliseconds(1));
}
evconnlistener_free(ev_listener_);
event_base_free(ev_base_);
static void SConnListenerCb(struct evconnlistener *listen, evutil_socket_t sock, struct sockaddr *addr, int len, void *ctx)
{
    // 解析客户端ip
    char ip[16] = {0};
    sockaddr_in *addr_in = (sockaddr_in *)addr;
    evutil_inet_ntop(AF_INET, &addr_in->sin_addr, ip, sizeof(ip));
    stringstream ss;
    ss << ip << ":" << addr_in->sin_port << " 链接完成...";
    LOG4CPLUS_INFO(SimpleLogger::Get()->LoggerRef(), ss.str());
    SmsServer *server = (SmsServer *)ctx;
    int s = sock;

    server->ConnListener(s);
}

建立链接和注册读、写、事件监听

(1)bufferevent_socket_new():建立一个带socket缓存的事件socket

bufferevent表示一个事件缓存,每当有数据须要读取的时候,它会先将数据从内核态取出再通知用户。顺带提一下,libevent对事件的触发支持两种模式:(ET)边沿触发和(LT)水平触发。若是你设置了水平触发,可是经过bufferevent来读取消息,不管你是否将消息接收完成,都不会被反复触发回调。所以,使用bufferevent来接收消息的时候,须要特别关注TCP粘包和长包。

(2)bufferevent_setcb():设置bufferevent的回调函数

参数说明:

  • struct bufferevent* bufev 关联对象
  • bufferevent_data_cb readcb 读回调 函数原型void (*bufferevent_data_cb)(struct bufferevent *bev, void* ctx)
  • bufferevent_data_cb writecb 写回调 函数原型(同上)
  • bufferevent_event_cb eventcb 事件回调 函数原型void (*bufferevent_event_cb)(struct bufferevent *bev, short what, void *ctx)
  • void *cbarg 回调函数的最后一个参数,由用户指定

读回调顾名思义就是当有数据的时候会触发的函数,但是写回调何时触发?有兴趣的朋友能够本身测试一下。特别须要关注事件回调函数。全部可触发的事件包括:BEV_EVENT_READING(读事件),BEV_EVENT_WRITING(写事件),BEV_EVENT_EOF(结束事件),BEV_EVENT_ERROR(错误事件),BEV_EVENT_TIMEOUT(超时事件),BEV_EVENT_CONNECTED(链接事件)。若是你是在开发服务端BEV_EVENT_CONNECTED事件不会被触发,由于链接事件是在bufferevent建立前产生的。BEV_EVENT_READING || BEV_EVENT_TIMEOUT能够用来表示读数据超时,经过这个事件能够侦测心跳表明距离上次读数据已经超时。BEV_EVENT_WRITING || BEV_EVENT_TIMEOUT能够表示写超时,可是这个事件只会在当有数据须要被发送但是超时未发送成功后才会被触发。

此外,当发生超时事件后,相关的读写操做都会被从bufferevent中移除。若是用户但愿继续以前的操做,须要从新注册读/写。

(3)bufferevent_set_timeouts():设置读/写超时

只有在经过这个函数设置了读/写超时后,在事件回调函数中BEV_EVENT_TIMEOUT才会生效。

代码示例:

bufferevent *buff_ev_ = bufferevent_socket_new(ev_base_, socket_, BEV_OPT_CLOSE_ON_FREE);
if (!buff_ev_)
{
    return false;
}
// 指定参数
bufferevent_setcb(buff_ev_, SReadCb, SWriteCb, SEventCb, this);
bufferevent_enable(buff_ev_, EV_READ | EV_WRITE);
timeval tv = {timeout_, 0};
bufferevent_set_timeouts(buff_ev_, &tv, NULL);
return true;

读写数据

(1)bufferevent_read():从缓存中接收数据

一般在读回调中使用,经过返回值判断缓存中是否还有数据

(2)bufferevent_write():向缓冲写入数据以经过socket发送

返回值表示有多少数据已经被写入进内核

建立基于管道的事件

libevent除了能够用在网络上,还能够和管道(pipe)结合用来生成管道事件。

(1)event_config_new():建立一个事件配置对象

event_config能够用来建立一个非默认的事件循环,一般使用这个函数配合event_base_new_with_config()建立event_base。最后须要使用event_config_free()来释放资源。

(2)event_new():建立一个读/写事件

和bufferevent的建立不一样,event_new()只会建立一个配套的事件,若是在事件中用户没有对数据进行处理,回调会一直被触发。

代码示例:

// 初始化一对管道,只能在linux系统下使用
int pipefd[2];
if (pipe(pipefd))
{
    return false;
}
// pipefd[0]读取管道 pipefd[1]发送管道
this->pipe_endpoint_ = pipefd[1];
// 建立管道事件
event_config *ev_conf = event_config_new();
event_config_set_flag(ev_conf, EVENT_BASE_FLAG_NOLOCK);
this->ev_base_ = event_base_new_with_config(ev_conf);
event_config_free(ev_conf);
if (!ev_base_)
{
    return false;
}

pipe_ev_ = event_new(this->ev_base_, pipefd[0], EV_READ | EV_PERSIST, SEventCb, this);
event_add(pipe_ev_, 0);

 2、实现线程池

线程池实现原理

libevent能够实现对线程的多路复用,所以咱们能够在一个线程中完成服务端对多个客户端的同时读写操做。这样作虽然可以最大限度的利用系统资源,但是没法充分发挥cpu多线程的处理能力。开发高可用和适合高负载的服务端咱们依然应该考虑启动多个线程来处理数据。关键是咱们如何将事件分发到不一样的线程中以保持多个线程的负载均衡。

  1. 当服务启动的时候首先建立N条线程。每个线程对应一个事件循环event_base。
  2. 主线程负责监听指定端口并在链接的回调函数中处理新链接套接字的处理。
  3. 当有新的客户端链接后,主线程会把套接字先保存在一个队列中。扫描当前全部线程的处理量,选择负载最小的线程利用管道发送一个信号(‘c’)。对应线程的事件循环在管道的读事件中从队列中获取这个套接字,并创建对应的bufferevent进行处理。当前线程负载量+1。
  4. 客户端断开后通知bufferevent所在的线程将负载量减一。

smss源码阅读

相关的源码文件为sms_server,work_group,work_thread和socket_manager

服务初始化,注册链接监听事件并初始化线程组

bool SmsServer::Init()
{
    // 建立事件循环
    ev_base_ = event_base_new();
    if (!ev_base_)
    {
        return false;
    }
    sockaddr_in sin;
    memset(&sin, 0, sizeof(sin));
    sin.sin_family = AF_INET;
    sin.sin_port = htons(port_);
    // 建立socket链接回调
    ev_listener_ = evconnlistener_new_bind(
        ev_base_,
        SConnListenerCb,
        this,
        LEV_OPT_REUSEABLE | LEV_OPT_CLOSE_ON_FREE,
        this->backlog_,
        (sockaddr *)&sin,
        sizeof(sin));
    if (!ev_listener_)
    {
        return false;
    }
    // 建立线程组管理类
    boss_ = new WorkGroup(thread_num_);
    boss_->Init();
    return true;
}

线程组负责管理全部的线程

bool WorkGroup::Init()
{
    // 直接初始化指定的工做线程
    for (int i = 0; i < num_; i++)
    {
        int id = group_.size() + 1;
        WorkThread *work = new WorkThread(this, id, net_bus_);
        if (!work->Init())
        {
            return false;
        }
        work->Start(); // thread start...
        group_.push_back(work);
        // 将当前初始化完成的工做线程注册进消息总线
        net_bus_->Regist(work); // regist thread to netbus
    }
    return true;
}

每个线程在初始化的时候会建立一条管道并在本身的事件循环上注册对应的读回调,对外部暴露Notify方法用来激活事件

bool WorkThread::Init()
{
    // 初始化一对管道,只能在linux系统下使用
    int pipefd[2];
    if (pipe(pipefd))
    {
        return false;
    }
    // pipefd[0]读取管道 pipefd[1]发送管道
    this->pipe_endpoint_ = pipefd[1];
    // 建立管道事件
    event_config *ev_conf = event_config_new();
    event_config_set_flag(ev_conf, EVENT_BASE_FLAG_NOLOCK);
    this->ev_base_ = event_base_new_with_config(ev_conf);
    event_config_free(ev_conf);
    if (!ev_base_)
    {
        return false;
    }

    pipe_ev_ = event_new(this->ev_base_, pipefd[0], EV_READ | EV_PERSIST, SEventCb, this);
    event_add(pipe_ev_, 0);
    return true;
}

void WorkThread::Notify(const char *sign)
{
    // 激活
    int re = write(this->pipe_endpoint_, sign, 1);
    if (re <= 0)
    {
        LOG4CPLUS_ERROR(SimpleLogger::Get()->LoggerRef(), "管道激活失败");
    }
}

在读回调中获取套接字,建立链接管理对象SocketManager

void WorkThread::Activated(int fd)
{
    char buf[2] = {0};
    int re = read(fd, buf, 1);
    socket_list_mtx_.lock();
    if (strcmp(buf, "c") == 0) // 通知有新的客户端链接
    {
        // new client connect, create SocketManager
        if (socket_list_.empty())
        {
            socket_list_mtx_.unlock();
            return;
        }
        // 读取一条套接字
        int client_sock = socket_list_.front();
        socket_list_.pop_front();
        // 建立socketManager
        SocketManager *manager = new SocketManager(this, ev_base_, client_sock, AppContext::Get()->client_timeout());
        manager->Init();
        sock_manager_list_.push_back(manager);
        stringstream ss;
        ss << "SocketManager:" << client_sock << " 建立完成";
        LOG4CPLUS_DEBUG(SimpleLogger::Get()->LoggerRef(), ss.str());
    }

    socket_list_mtx_.unlock();
}

客户端链接后将建立的套接字交给负载最小的线程处理

void WorkGroup::CreateConnection(int sock)
{
    int min = -1;
    WorkThread *work = nullptr;
    // 遍历寻找负载最轻的线程
    for (auto it = group_.begin(); it != group_.end(); it++)
    {
        if (min == -1)
        {
            min = (*it)->connect_num();
            work = (*it);
        }
        else if ((*it)->connect_num() < min)
        {
            min = (*it)->connect_num();
            work = (*it);
        }
    }
    // 添加一条socket fd进队列并经过管道激活
    work->AddSocket(sock);
    work->Notify("c");
}

 

完整源码已经发布在码云上。

相关文章:《开源项目SMSS开发指南》

相关文章
相关标签/搜索