构建现代的server应用程序需要以某种方法同一时候接收数百、数千甚至数万个事件,无论它们是内部请求仍是网络链接,都要有效地处理它们的操做。php
有不少解决方式,但事件驱动也被普遍应用到网络编程中。并大规模部署在高链接数高吞吐量的server程序中,如 http server程序、ftp server程序等。html
相比于传统的网络编程方式,事件驱动能够极大的减小资源占用,增大服务接待能力,并提升网络传输效率。mysql
这些事件驱动模型中, libevent 库和 libev库能够大大提升性能和事件处理能力。react
在本文中。咱们要讨论在 UNIX/Linux 应用程序中使用和部署这些解决方式所用的基本结构和方法。linux
libev 和 libevent 都可以在高性能应用程序中使用。ios
在讨论libev 和 libevent以前,咱们看看I/O模型演进变化历史web
一、堵塞网络接口:处理单个clientsql
咱们 第一次接触到的网络编程通常都是从 listen() 、 send() 、 recv() 等接口開始的。使用这些接口可以很是方便的构建server / 客户机的模型。堵塞I/O模型图:在调用recv()函数时,发生在内核中等待数据和复制数据的过程。数据库
当调用recv()函数时。系统首先查是否有准备好的数据。假设数据没有准备好,那么系统就处于等待状态。当数据准备好后,将数据从系统缓冲区拷贝到用户空间,而后该函数返回。在套接应用程序中,当调用recv()函数时,未必用户空间就已经存在数据,那么此时recv()函数就会处于等待状态。apache
咱们注意到。大部分的 socket 接口都是堵塞型的。所谓堵塞型接口是指系统调用(一般是 IO 接口)不返回调用结果并让当前线程一直堵塞,仅仅有当该系统调用得到结果或者超时出错时才返回。
实际上,除非特别指定,差点儿所有的 IO 接口 ( 包含 socket 接口 ) 都是堵塞型的。这给网络编程带来了一个很是大的问题。如在调用 send() 的同一时候,线程将被堵塞,在此期间。线程将没法运行不论什么运算或响应不论什么的网络请求。这给多客户机、多业务逻辑的网络编程带来了挑战。这时。很是多程序猿可能会选择多线程的方式来解决问题。
使用堵塞模式的套接字,开发网络程序比較简单。easy实现。
当但愿能够立刻发送和接收数据。且处理的套接字数量比較少的状况下。即一个一个处理client,server没什么压力。使用堵塞模式来开发网络程序比較合适。
堵塞模式给网络编程带来了一个很是大的问题,如在调用 send()的同一时候。线程将被堵塞,在此期间,线程将没法运行不论什么运算或响应不论什么的网络请求。
假设很是多client同一时候訪问server,server就不能同一时候处理这些请求。这时,咱们可能会选择多线程的方式来解决问题。
二、多线程/进程处理多个client
应对多客户机的网络应用,最简单的解决方案是在server端使用多线程(或多进程)。多线程(或多进程)的目的是让每个链接都拥有独立的线程(或进程),这样不论什么一个链接的堵塞都不会影响其它的链接。
详细使用多进程仍是多线程,并无一个特定的模式。
传统意义上,进程的开销要远远大于线程,因此,假设需要同一时候为较多的客户机提供服务。则不推荐使用多进程;假设单个服务运行体需要消耗较多的 CPU 资源,譬如需要进行大规模或长时间的数据运算或文件訪问,则进程较为安全。一般,使用 pthread_create () 建立新线程。fork() 建立新进程。即:
(1) a new Connection 进来。用 fork() 产生一个 Process 处理。
(2) a new Connection 进来。用 pthread_create() 产生一个 Thread 处理。
多线程/进程server同一时候为多个客户机提供应答服务。模型例如如下:
主线程持续等待client的链接请求,假设有链接,则建立新线程,并在新线程中提供为前例相同的问答服务。
#include <stdio.h> #include <stdlib.h> #include <string.h> #include <unistd.h> #include <sys/socket.h> #include <netinet/in.h> #include <arpa/inet.h> void do_service(int conn); void err_log(string err, int sockfd) { perror("binding"); close(sockfd); exit(-1); } int main(int argc, char *argv[]) { unsigned short port = 8000; int sockfd; sockfd = socket(AF_INET, SOCK_STREAM, 0);// 建立通讯端点:套接字 if(sockfd < 0) { perror("socket"); exit(-1); } struct sockaddr_in my_addr; bzero(&my_addr, sizeof(my_addr)); my_addr.sin_family = AF_INET; my_addr.sin_port = htons(port); my_addr.sin_addr.s_addr = htonl(INADDR_ANY); int err_log = bind(sockfd, (struct sockaddr*)&my_addr, sizeof(my_addr)); if( err_log != 0) err_log("binding"); err_log = listen(sockfd, 10); if(err_log != 0) err_log("listen"); struct sockaddr_in peeraddr; //传出參数 socklen_t peerlen = sizeof(peeraddr); //传入传出參数。必须有初始值 int conn; // 已链接套接字(变为主动套接字,即可以主动connect) pid_t pid; while (1) { if ((conn = accept(sockfd, (struct sockaddr *)&peeraddr, &peerlen)) < 0) //3次握手完毕的序列 err_log("accept error"); printf("recv connect ip=%s port=%d/n", inet_ntoa(peeraddr.sin_addr),ntohs(peeraddr.sin_port)); pid = fork(); if (pid == -1) err_log("fork error"); if (pid == 0) {// 子进程 close(listenfd); do_service(conn); exit(EXIT_SUCCESS); } else close(conn); //父进程 } return 0; } void do_service(int conn) { char recvbuf[1024]; while (1) { memset(recvbuf, 0, sizeof(recvbuf)); int ret = read(conn, recvbuf, sizeof(recvbuf)); if (ret == 0) { //客户端关闭了 printf("client close/n"); break; } else if (ret == -1) ERR_EXIT("read error"); fputs(recvbuf, stdout); write(conn, recvbuf, ret); } }
很是多刚開始学习的人可能不明确为什么一个 socket 能够 accept 屡次。实际上,socket 的设计者可能特地为多客户机的状况留下了伏笔,让 accept() 能够返回一个新的 socket。如下是 accept 接口的原型:
int accept(int s, struct sockaddr *addr, socklen_t *addrlen);
输入參数 s 是从 socket()。bind() 和 listen() 中沿用下来的 socket 句柄值。运行完 bind() 和 listen() 后,操做系统已经開始在指定的port处监听所有的链接请求。假设有请求。则将该链接请求增长请求队列。
调用 accept() 接口正是从 socket s 的请求队列抽取第一个链接信息,建立一个与 s 同类的新的 socket 返回句柄。新的 socket 句柄便是兴许 read() 和 recv() 的输入參数。假设请求队列当前没有请求。则 accept() 将进入堵塞状态直到有请求进入队列。
上述多线程的server模型彷佛完美的攻克了为多个客户机提供问答服务的要求,但事实上并不尽然。假设要同一时候响应成百上千路的链接请求,则无论多线程仍是多进程都会严重占领系统资源,减小系统对外界响应效率。而线程与进程自己也更easy进入假死状态。
所以其缺点:
1)用 fork() 的问题在于每一个 Connection 进来时的成本过高,假设同一时候接入的并发链接数太多easy进程数量很是多,进程之间的切换开销会很是大,同一时候对于老的内核(Linux)会产生雪崩效应。
2)用 Multi-thread 的问题在于 Thread-safe 与 Deadlock 问题难以解决。另外有 Memory-leak 的问题要处理,这个问题对于很是多程序猿来讲无异于恶梦,尤为是对于连续server的server程序更是不可以接受。 假设才用 Event-based 的方式在于实作上很差写,尤为是要注意到事件产生时必须 Nonblocking。因而会需要实作 Buffering 的问题。而 Multi-thread 所会遇到的 Memory-leak 问题在这边会更严重。
而在多 CPU 的系统上没有办法使用到所有的 CPU resource。
由此可能会考虑使用“线程池”或“链接池”。
“线程池”旨在下降建立和销毁线程的频率,其维持必定合理数量的线程。并让空暇的线程又一次承担新的运行任务。“链接池”维持链接的缓存池。尽可能重用已有的链接、下降建立和关闭链接的频率。这两种技术都可以很是好的下降系统开销,都被普遍应用很是多大型系统。如apache,mysql数据库等。
但是,“线程池”和“链接池”技术也仅仅是在必定程度上缓解了频繁调用 IO 接口带来的资源占用。而且。所谓“池”始终有其上限,当请求大大超过上限时,“池”构成的系统对外界的响应并不比没有池的时候效果好多少。
因此使用“池”必须考虑其面临的响应规模,并依据响应规模调整“池”的大小。
相应上例中的所面临的可能同一时候出现的上千甚至上万次的client请求,“线程池”或“链接池”也许可以缓解部分压力,但是不能解决所有问题。
因为多线程/进程致使过多的占用内存或 CPU等系统资源。
三、非堵塞的server模型
以上面临的很是多问题,必定程度是 IO 接口的堵塞特性致使的。
多线程是一个解决方式,还一个方案就是使用非堵塞的接口。
非堵塞的接口相比于堵塞型接口的显著差别在于。在被调用以后立刻返回。
使用例如如下的函数可以将某句柄 fd 设为非堵塞状态。
咱们可以使用 fcntl(fd, F_SETFL, flag | O_NONBLOCK); 将套接字标志变成非堵塞:
fcntl( fd, F_SETFL, O_NONBLOCK );
如下将给出仅仅用一个线程。但能够同一时候从多个链接中检測数据是否送达,并且接受数据。
在非堵塞状态下,recv() 接口在被调用后立刻返回,返回值表明了不一样的含义。
调用recv。假设设备临时没有数据可读就返回-1,同一时候置errno为EWOULDBLOCK(或者EAGAIN,这两个宏定义的值一样)。表示原本应该堵塞在这里(would block,虚拟语气),其实并无堵塞而是直接返回错误,调用者应该试着再读一次(again)。
这样的行为方式称为轮询(Poll)。调用者仅仅是查询一下。而不是堵塞在这里死等
如在本例中,
这样可以同一时候监视多个设备:
while(1){
非堵塞read(设备1);
if(设备1有数据到达)
处理数据;
非堵塞read(设备2);
if(设备2有数据到达)
处理数据;
..............................
}
假设read(设备1)是堵塞的,那么仅仅要设备1没有数据到达就会一直堵塞在设备1的read调用上,即便设备2有数据到达也不能处理,使用非堵塞I/O就可以避免设备2得不到及时处理。
相似一个快递的样例:这里使用忙轮询的方法:每隔1微妙(while(1)差点儿不间断)到A楼一层(内核缓冲区)去看快递来了没有。假设没来。立刻返回。
而快递来了,就放在A楼一层,等你去取。
非堵塞I/O有一个缺点,假设所有设备都一直没有数据到达,调用者需要重复查询作无用功,假设堵塞在那里。操做系统可以调度别的进程运行,就不会作无用功了。在实际应用中非堵塞I/O模型比較少用。
可以看到server线程可以经过循环调用 recv() 接口。可以在单个线程内实现对所有链接的数据接收工做。
但是上述模型毫不被推荐。因为。循环调用 recv() 将大幅度推高 CPU 占用率。此外。在这个方法中,recv() 不少其它的是起到检測“操做是否完毕”的做用,实际操做系统提供了更为高效的检測“操做是否完毕“做用的接口。好比 select()。
四、IO复用事件驱动server模型
简单介绍:主要是select和epoll;对一个IOport,两次调用,两次返回。比堵塞IO并无什么优越性。关键是能实现同一时候对多个IOport进行监听;
I/O复用模型会用到select、poll、epoll函数。这几个函数也会使进程堵塞,但是和堵塞I/O所不一样的的,这两个函数可以同一时候堵塞多个I/O操做。而且可以同一时候对多个读操做,多个写操做的I/O函数进行检測,直到有数据可读或可写时,才真正调用I/O操做函数。
咱们先具体解释select:
SELECT函数进行IO复用server模型的原理是:当一个client链接上server时。server就将其链接的fd增长fd_set集合,等到这个链接准备好读或写的时候,就通知程序进行IO操做,与client进行数据通讯。
大部分 Unix/Linux 都支持 select 函数。该函数用于探測多个文件句柄的状态变化。
FD_ZERO(int fd, fd_set* fds) FD_SET(int fd, fd_set* fds) FD_ISSET(int fd, fd_set* fds) FD_CLR(int fd, fd_set* fds) int select( int maxfdp, //Winsock中此參数无心义 fd_set* readfds, //进行可读检測的Socket fd_set* writefds, //进行可写检測的Socket fd_set* exceptfds, //进行异常检測的Socket const struct timeval* timeout //非堵塞模式中设置最大等待时间 )
參数列表:
int maxfdp :是一个整数值,意思是“最大fd加1(max fd plus 1). 在三个描写叙述符集(readfds, writefds, exceptfds)中找出最高描写叙述符
编号值,而后加 1也可将maxfdp设置为 FD_SETSIZE。这是一个< sys/types.h >中的常数,它说明了最大的描写叙述符数(经常是 256或1024) 。
但是对大多数应用程序而言,此值太大了。
确实,大多数应用程序仅仅应用 3 ~ 1 0个描写叙述符。假设将第三个參数设置为最高描写叙述符编号值加 1,内核就仅仅需在此范围内寻找打开的位。而没必要在数百位的大范围内搜索。
fd_set *readfds: 是指向fd_set结构的指针,这个集合中应该包含文件描写叙述符,咱们是要监视这些文件描写叙述符的读变化的。即咱们关
心可否够从这些文件里读取数据了,假设这个集合中有一个文件可读,select就会返回一个大于0的值。表示有文件可读,假设没有可读的文件。则依据timeout參数再推断是否超时,若超出timeout的时间,select返回0,若错误发生返回负值。可以传入NULL值。表示不关心不论什么文件的读变化。
fd_set *writefds: 是指向fd_set结构的指针,这个集合中应该包含文件描写叙述符,咱们是要监视这些文件描写叙述符的写变化的,即咱们关
心可否够向这些文件里写入数据了,假设这个集合中有一个文件可写,select就会返回一个大于0的值,表示有文件可写,假设没有可写的文件,则依据timeout參数再推断是否超时,若超出timeout的时间,select返回0。若错误发生返回负值。可以传入NULL值,表示不关心不论什么文件的写变化。
fd_set *errorfds: 同上面两个參数的意图,用来监视文件错误异常。
readfds , writefds,*errorfds每个描写叙述符集存放在一个fd_set 数据类型中.如图:
struct timeval* timeout :是select的超时时间,这个參数相当重要。它可以使select处于三种状态:
第一,若将NULL以形參传入,即不传入时间结构,就是将select置于堵塞状态,必定等到监视文件描写叙述符集合中某个文件描写叙述符发生变化为止。
第二,若将时间值设为0秒0毫秒,就变成一个纯粹的非堵塞函数,不管文件描写叙述符是否有变化,都立马返回继续运行。文件无变化返回0,有变化返回一个正值;
第三,timeout的值大于0,这就是等待的超时时间,即 select在timeout时间内堵塞。超时时间以内有事件到来就返回了。不然在超时后不管如何必定返回,返回值同上述。
这里需要注意的一点是,select的堵塞与是否设置非堵塞I/O是没有关系的。
/* 可读、可写、异常三种文件描写叙述符集的申明和初始化。*/ fd_set readfds, writefds, exceptionfds; FD_ZERO(&readfds); FD_ZERO(&writefds); FD_ZERO(&exceptionfds); int max_fd; /* socket配置和监听。
*/ sock = socket(...); bind(sock, ...); listen(sock, ...); /* 对socket描写叙述符上发生关心的事件进行注冊。
*/ FD_SET(&readfds, sock); max_fd = sock; while(1) { int i; fd_set r,w,e; /* 为了反复使用readfds 、writefds、exceptionfds,将它们复制到暂时变量内。*/ memcpy(&r, &readfds, sizeof(fd_set)); memcpy(&w, &writefds, sizeof(fd_set)); memcpy(&e, &exceptionfds, sizeof(fd_set)); /* 利用暂时变量调用select()堵塞等待。timeout=null表示等待时间为永远等待直到发生事件。*/ select(max_fd + 1, &r, &w, &e, NULL); /* 測试是否有client发起链接请求,假设有则接受并把新建的描写叙述符增长监控。*/ if(FD_ISSET(&r, sock)){ new_sock = accept(sock, ...); FD_SET(&readfds, new_sock); FD_SET(&writefds, new_sock); max_fd = MAX(max_fd, new_sock); } /* 对其余描写叙述符发生的事件进行适当处理。描写叙述符依次递增,最大值各系统有所不一样(比方在做者系统上最大为1024)。 在linux可以用命令ulimit -a查看(用ulimit命令也对该值进行改动)。
在freebsd下,用sysctl -a | grep kern.maxfilesperproc来查询和改动。
*/ for(i= sock+1; i <max_fd+1; ++i) { if(FD_ISSET(&r, i)) doReadAction(i); if(FD_ISSET(&w, i)) doWriteAction(i); } }
FD_ZERO(int fd, fd_set* fds) //清除其所有位
FD_SET(int fd, fd_set* fds) //在某 fd_set 中标记一个fd的相应位为1
FD_ISSET(int fd, fd_set* fds) // 測试该集中的一个给定位是否仍旧设置
FD_CLR(int fd, fd_set* fds) //删除相应位
这里,fd_set 类型可以简单的理解为按 bit 位标记句柄的队列,好比要在某 fd_set 中标记一个值为 16 的句柄,则该 fd_set 的第 16 个 bit 位被标记为 1。详细的置位、验证可以使用 FD_SET、FD_ISSET 等宏实现。
好比,编写下列代码:
fd_setreadset,writeset; FD_ZERO(&readset); FD_ZERO(&writeset); FD_SET(0,&readset); FD_SET(3,&readset); FD_SET(1,&writeset); FD_SET(2,&writeset); select(4,&readset,&writeset,NULL,NULL);而后,下图显示了这两个描写叙述符集的状况:
因为描写叙述符编号从0開始,因此要在最大描写叙述符编号值上加1。
第一个參数其实是要检查的描写叙述符数(从描写叙述符0開始)。
若指定的描写叙述符都没有准备好,而且指定的时间已经超过。则发生这样的状况。
(3)返回一个正值说明了已经准备好的描写叙述符数,在这样的状况下,三个描写叙述符集中仍旧打开的位是相应于已准备好的描写叙述符位。
如下将又一次模拟上例中从多个client接收数据的模型。
使用select()的接收数据模型上述模型仅仅是描写叙述了使用 select() 接口同一时候从多个client接收数据的过程;由于 select() 接口可以同一时候对多个句柄进行读状态、写状态和错误状态的探測。因此可以很是easy构建为多个client提供独立问答服务的server系统。
使用select()接口的基于事件驱动的server模型
这里需要指出的是。client的一个 connect() 操做,将在server端激发一个“可读事件”,因此 select() 也能探測来自client的 connect() 行为。
上述模型中,最关键的地方是怎样动态维护 select() 的三个參数 readfds、writefds 和 exceptfds。做为输入參数,readfds 应该标记所有的需要探測的“可读事件”的句柄,当中永远包含那个探測 connect() 的那个“母”句柄;同一时候,writefds 和 exceptfds 应该标记所有需要探測的“可写事件”和“错误事件”的句柄 ( 使用 FD_SET() 标记 )。
做为输出參数,readfds、writefds 和 exceptfds 中的保存了 select() 捕捉到的所有事件的句柄值。程序猿需要检查的所有的标记位 ( 使用 FD_ISSET() 检查 ),以肯定究竟哪些句柄发生了事件。
上述模型主要模拟的是“一问一答”的服务流程,因此,假设 select() 发现某句柄捕捉到了“可读事件”,server程序应及时作 recv() 操做。并依据接收到的数据准备好待发送数据。并将相应的句柄值增长 writefds,准备下一次的“可写事件”的 select() 探測。相同,假设 select() 发现某句柄捕捉到“可写事件”,则程序应及时作 send() 操做,并准备好下一次的“可读事件”探測准备。
下图描写叙述的是上述模型中的一个运行周期。
一个运行周期
这样的模型的特征在于每一个运行周期都会探測一次或一组事件。一个特定的事件会触发某个特定的响应。咱们可以将这样的模型归类为“事件驱动模型”。
相比其它模型,使用 select() 的事件驱动模型仅仅用单线程(进程)运行,占用资源少,不消耗太多 CPU,同一时候能够为多client提供服务。
假设试图创建一个简单的事件驱动的server程序,这个模型有必定的參考价值。但这个模型依然有着很是多问题。
select的缺点:
(1)单个进程能够监视的文件描写叙述符的数量存在最大限制
(2)select需要复制大量的句柄数据结构。产生巨大的开销
(3)select返回的是含有整个句柄的列表,应用程序需要消耗大量时间去轮询各个句柄才干发现哪些句柄发生了事件
(4)select的触发方式是水平触发,应用程序假设没有完毕对一个已经就绪的文件描写叙述符进行IO操做,那么以后每次select调用仍是会将这些文件描写叙述符通知进程。相相应方式的是边缘触发。
(6) 该模型将事件探測和事件响应夹杂在一块儿。一旦事件响应的运行体庞大,则对整个模型是灾难性的。例如如下例。庞大的运行体 1 的将直接致使响应事件 2 的运行体迟迟得不到运行,并在很是大程度上减小了事件探測的及时性。
庞大的运行体对使用select()的事件驱动模型的影响
很是多操做系统提供了更为高效的接口,如 linux 提供了 epoll,BSD 提供了 kqueue。Solaris 提供了 /dev/poll …。
假设需要实现更高效的server程序,相似 epoll 这种接口更被推荐。
所以。poll有着与select类似的处理流程:
如下对epoll的使用进行说明:
在这样的状况下,仅仅要有数据没有读、写完,调用epoll_wait()的时候,就会有事件被触发。
/* 新建并初始化文件描写叙述符集。*/ struct epoll_event ev; struct epoll_event events[MAX_EVENTS]; /* 建立epoll句柄。*/ int epfd = epoll_create(MAX_EVENTS); /* socket配置和监听。*/ sock = socket(...); bind(sock, ...); listen(sock, ...); /* 对socket描写叙述符上发生关心的事件进行注冊。*/ ev.events = EPOLLIN; ev.data.fd = sock; epoll_ctl(epfd, EPOLL_CTL_ADD, sock, &ev); while(1) { int i; /*调用epoll_wait()堵塞等待。等待时间为永远等待直到发生事件。*/ int n = epoll_wait(epfd, events, MAX_EVENTS, -1); for(i=0; i <n; ++i) { /* 測试是否有client发起链接请求,假设有则接受并把新建的描写叙述符增长监控。*/ if(events.data.fd == sock) { if(events.events & POLLIN){ new_sock = accept(sock, ...); ev.events = EPOLLIN | POLLOUT; ev.data.fd = new_sock; epoll_ctl(epfd, EPOLL_CTL_ADD, new_sock, &ev); } }else{ /* 对其余描写叙述符发生的事件进行适当处理。*/ if(events.events & POLLIN) doReadAction(i); if(events.events & POLLOUT) doWriteAction(i); } } }
这两个词来源于计算机硬件设计。
它们的差异是仅仅要句柄知足某种状态,水平触发就会发出通知。而仅仅有当句柄状态改变时。边缘触发才会发出通知。好比一个socket通过长时间等待后接收到一段100k的数据,两种触发方式都会向程序发出就绪通知。若是程序从这个socket中读取了50k数据。并再次调用监听函数,水平触发依旧会发出就绪通知,而边缘触发会因为socket“有数据可读”这个状态没有发生变化而不发出通知且陷入长时间的等待。
所以在使用边缘触发的 api 时,要注意每次都要读到 socket返回 EWOULDBLOCK为止
遗憾的是不一样的操做系统特供的 epoll 接口有很是大差别,因此使用相似于 epoll 的接口实现具备较好跨平台能力的server会比較困难。
幸运的是,有很是多高效的事件驱动库可以屏蔽上述的困难,常见的事件驱动库有 libevent 库。还有做为 libevent 替代者的 libev 库。
这些库会依据操做系统的特色选择最合适的事件探測接口。并且增长了信号 (signal) 等技术以支持异步响应,这使得这些库成为构建事件驱动模型的不二选择。
下章将介绍怎样使用 libev 库替换 select 或 epoll 接口。实现高效稳定的server模型。
五、libevent方法
libevent是一个事件触发的网络库,适用于windows、linux、bsd等多种平台,内部使用select、epoll、kqueue等系统调用管理事件机制。著名分布式缓存软件memcached也是libevent based,而且libevent在使用上可以作到跨平台。而且依据libevent官方站点上发布的数据统计,彷佛也有着非凡的性能。
libevent 库实际上没有更换 select()
、poll()
或其它机制的基础。而是使用对于每个平台最高效的高性能解决方式在实现外加上一个包装器。
为了实际处理每个请求,libevent 库提供一种事件机制。它做为底层网络后端的包装器。
事件系统让为链接加入处理函数变得很简便,同一时候减小了底层 I/O 复杂性。这是 libevent 系统的核心。
libevent 库的其它组件提供其它功能。包含缓冲的事件系统(用于缓冲发送到client/从client接收的数据)以及 HTTP、DNS 和 RPC 系统的核心实现。
1)事件驱动,高性能;
2)轻量级,专一于网络。
3) 跨平台,支持 Windows、Linux、Mac Os等;
4) 支持多种 I/O多路复用技术, epoll、poll、dev/poll、select 和kqueue 等。
5) 支持 I/O。定时器和信号等事件
1)event 及 event_base事件管理包含各类IO(socket)、定时器、信号等事件,也是libevent应用最广的模块。
2 ) evbuffer event 及 event_base 缓存管理是指evbuffer功能;提供了高效的读写方法
3) evdns DNS是libevent提供的一个异步DNS查询功能;
4) evhttp HTTP是libevent的一个轻量级http实现,包含server和client
libevent也支持ssl,这对于有安全需求的网络程序很是的重要。但是其支持不是很是无缺,比方http server的实现就不支持ssl。
libevent是事件驱动的库,所谓事件驱动,简单地说就是你点什么button(即产生什么事件),电脑运行什么操做(即调用什么函数)。
Libevent框架本质上是一个典型的Reactor模式。因此仅仅需要弄懂Reactor模型。libevent就八九不离十了。
Reactor模式。是一种事件驱动机制。
应用程序需要提供对应的接口并注冊到Reactor上,假设对应的事件发生,Reactor将主动调用应用程序注冊的接口,这些接口又称为“回调函数”。
在Libevent中也是同样。向Libevent框架注冊对应的事件和回调函数;当这些事件发生时,Libevent会调用这些回调函数处理对应的事件(I/O读写、定时和信号)。
使用Reactor模型,必备的几个组件:事件源、Reactor框架、多路复用机制和事件处理程序,先来看看Reactor模型的整体框架,接下来再对每个组件作逐一说明。
1) 事件源
1) 2) event demultiplexer——事件多路分发机制
由操做系统提供的I/O多路复用机制,比方select和epoll。程序首先将其关心的句柄(事件源)及其事件注冊到event demultiplexer上。当有事件到达时,event demultiplexer会发出通知“在已经注冊的句柄集中,一个或多个句柄的事件已经就绪”;程序收到通知后,就可以在非堵塞的状况下对事件进行处理了。
相应到libevent中,依旧是select、poll、epoll等,但是libevent使用结构体eventop进行了封装,以统一的接口来支持这些I/O多路复用机制,达到了对外隐藏底层系统机制的目的。
3) Reactor——反应器
Reactor,是事件管理的接口,内部使用event demultiplexer注冊、注销事件;并执行事件循环。当有事件进入“就绪”状态时,调用注冊事件的回调函数处理事件。
相应到libevent中。就是event_base结构体。
4) Event Handler——事件处理程序
事件处理程序提供了一组接口,每个接口对应了一种类型的事件。供Reactor在对应的事件发生时调用,运行对应的事件处理。一般它会绑定一个有效的句柄。
相应到libevent中,就是event结构体。
结合Reactor框架,咱们来理一下libevent的事件处理流程,请看下图:
event_init() 初始化:
首先要隆重介绍event_base对象:
struct event_base { const struct eventop *evsel; void *evbase; int event_count; /* counts number of total events */ int event_count_active; /* counts number of active events */ int event_gotterm; /* Set to terminate loop */ /* active event management */ struct event_list **activequeues; int nactivequeues; struct event_list eventqueue; struct timeval event_tv; RB_HEAD(event_tree, event) timetree; };
event_base对象整合了事件处理的一些全局变量, 角色是event对象的"总管家", 他包含了:
事件引擎函数对象(evsel, evbase),
当前入列事件列表(event_count, event_count_active, eventqueue),
全局终止信号(event_gotterm),
活跃事件列表(avtivequeues),
事件队列树(timetree)...
初始化时建立event_base对象, 选择 当前OS支持的事件引擎(epoll, poll, select...)并初始化, 建立全局信号队列(signalqueue), 活跃队列的内存分配( 依据设置的priority个数,默以为1).
event_setevent_set来设置event对象,包含所有者event_base对象, fd, 事件(EV_READ| EV_WRITE|EV_PERSIST), 回掉函数和參数,事件优先级是当前event_base的中间级别(current_base->nactivequeues/2)
设置监视事件后,事件处理函数可以仅仅被调用一次或总被调用。
仅仅调用一次:事件处理函数被调用后,即从事件队列中删除。需要在事件处理函数中再次增长事件,才干在下次事件发生时被调用;
总被调用:设置为EV_PERSIST,仅仅增长一次,处理函数总被调用,除非採用event_remove显式地删除。
event_add() 事件加入:
int event_add(struct event *ev, struct timeval *tv)
这个接口有两个參数, 第一个是要加入的事件, 第二个參数做为事件的超时值(timer). 假设该值非NULL, 在加入本事件的同一时候加入超时事件(EV_TIMEOUT)到时间队列树(timetree), 依据事件类型处理例如如下:
EV_READ => EVLIST_INSERTED => eventqueue
EV_WRITE => EVLIST_INSERTED => eventqueue
EV_TIMEOUT => EVLIST_TIMEOUT => timetree
EV_SIGNAL => EVLIST_SIGNAL => signalqueue
event_base_loop() 事件处理主循环
这里是事件的主循环,仅仅要flags不是设置为EVLOOP_NONBLOCK, 该函数就会一直循环监听事件/处理事件.
每次循环过程当中, 都会处理当前触发(活跃)事件:
(a). 检測当前是否有信号处理(gotterm, gotsig), 这些都是全局參数,不适合多线程
(b). 时间更新,找到离当前近期的时间事件, 获得相对超时事件tv
(c). 调用事件引擎的dispatch wait事件触发, 超时值为tv, 触发事件加入到activequeues
(d). 处理活跃事件, 调用caller的callbacks (event_process_acitve)
典型的libevent的应用大体整体流程:
建立 libevent server的基本方法是, 注冊当发生某一操做(比方接受来自client的链接)时应该运行的函数,而后调用主事件循环event_dispatch()
。运行过程的控制现在由 libevent 系统处理。
注冊事件和将调用的函数以后,事件系统開始自治。在应用程序执行时,可以在事件队列中加入(注冊)或删除(取消注冊)事件。事件注冊很方便,可以经过它加入新事件以处理新打开的链接,从而构建灵活的网络处理系统
(环境设置)-> (建立event_base) -> (注冊event,将此event增长到event_base中) -> (设置event各类属性。事件等) ->(将event增长事件列表 addevent) ->(開始事件监视循环、分发dispatch)。
样例:
好比,可以打开一个监听套接字,而后注冊一个回调函数,每当需要调用 accept()
函数以打开新链接时调用这个回调函数,这样就建立了一个网络server。例1例如如下所看到的的代码片断说明基本过程:
例1:打开监听套接字,注冊一个回调函数(每当需要调用 accept() 函数以打开新链接时调用它),由此建立网络server:
#include <stdio.h> #include <string.h> #include <iostream> #include <sys/socket.h> #include <netinet/in.h> #include <arpa/inet.h> #include <netdb.h> #include <event.h> using namespace std; // 事件base struct event_base* base; // 读事件回调函数 void onRead(int iCliFd, short iEvent, void *arg) { int iLen; char buf[1500]; iLen = recv(iCliFd, buf, 1500, 0); if (iLen <= 0) { cout << "Client Close" << endl; // 链接结束(=0)或链接错误(<0)。将事件删除并释放内存空间 struct event *pEvRead = (struct event*)arg; event_del(pEvRead); delete pEvRead; close(iCliFd); return; } buf[iLen] = 0; cout << "Client Info:" << buf << endl; } // 链接请求事件回调函数 void onAccept(int iSvrFd, short iEvent, void *arg) { int iCliFd; struct sockaddr_in sCliAddr; socklen_t iSinSize = sizeof(sCliAddr); iCliFd = accept(iSvrFd, (struct sockaddr*)&sCliAddr, &iSinSize); // 链接注冊为新事件 (EV_PERSIST为事件触发后不默认删除) struct event *pEvRead = new event; event_set(pEvRead, iCliFd, EV_READ|EV_PERSIST, onRead, pEvRead); event_base_set(base, pEvRead); event_add(pEvRead, NULL); } int main() { int iSvrFd; struct sockaddr_in sSvrAddr; memset(&sSvrAddr, 0, sizeof(sSvrAddr)); sSvrAddr.sin_family = AF_INET; sSvrAddr.sin_addr.s_addr = inet_addr("127.0.0.1"); sSvrAddr.sin_port = htons(8888); // 建立tcpSocket(iSvrFd),监听本机8888端口 iSvrFd = socket(AF_INET, SOCK_STREAM, 0); bind(iSvrFd, (struct sockaddr*)&sSvrAddr, sizeof(sSvrAddr)); listen(iSvrFd, 10); // 初始化base base = event_base_new(); struct event evListen; // 设置事件 event_set(&evListen, iSvrFd, EV_READ|EV_PERSIST, onAccept, NULL); // 设置为base事件 event_base_set(base, &evListen); // 加入事件 event_add(&evListen, NULL); // 事件循环 event_base_dispatch(base); return 0; }
event_set()
函数建立新的事件结构,
event_add()
在事件队列机制中加入事件。
而后,event_dispatch()
启动事件队列系统,開始监听(并接受)请求。
虽然 C 语言很是适合不少系统应用程序。但是在现代环境中不经常使用 C 语言,脚本语言更灵活、更有用。
幸运的是,Perl 和 PHP 等大多数脚本语言是用 C 编写的,因此可以经过扩展模块使用 libevent 等 C 库。
四、libev库
官方文档:http://pod.tst.eu/http://cvs.schmorp.de/libev/ev.pod
与 libevent 同样,libev 系统也是基于事件循环的系统,它在 poll()
、select()
等机制的本机实现的基础上提供基于事件的循环。
libev是libevent以后的一个事件驱动的编程框架。其接口和libevent基本相似。据官方介绍。其性能比libevent还要高,bug比libevent还少。
libev API 比較原始,没有 HTTP 包装器。但是 libev 支持在实现中内置不少其它事件类型。好比,一种 evstat 实现可以监视多个文件的属性变更,可以在 例4 所看到的的 HTTP 文件解决方式中使用它。
但是,libevent 和 libev 的基本过程是一样的。建立所需的网络监听套接字。注冊在运行期间要调用的事件。而后启动主事件循环,让 libev 处理过程的其他部分。
Libev是一个event loop:向libev注冊感兴趣的events,比方Socket可读事件,libev会对所注冊的事件的源进行管理,并在事件发生时触发对应的程序。
事件驱动框架:
定义一个监控器、书写触发动做逻辑、初始化监控器、设置监控器触发条件、将监控器增长大事件驱动器的循环中就能够。
libev的事件驱动过程可以想象成例如如下的伪代码:
do_some_init() is_run = True while is_run: t = caculate_loop_time() deal_loop(t) deal_with_pending_event() do_some_clear()
首先作一些初始化操做,而后进入到循环中,该循环经过一个状态位来控制是否运行。
在循环中。计算出下一次轮询的时间,这里轮询的实现就採用了系统提供的epoll、kqueue等机制。
再轮询结束后检查有哪些监控器的被触发了,依次运行触发动做。
Libev 除了提供了主要的三大类事件(IO事件、定时器事件、信号事件)外还提供了周期事件、子进程事件、文件状态改变事件等多个事件。
libev所实现的功能就是一个强大的reactor,可能notify事件主要包含如下这些:
libev 相同需要循环探測事件是否产生。
Libev 的循环体用 ev_loop 结构来表达。并用 ev_loop( ) 来启动。
void ev_loop( ev_loop* loop, int flags ) |
Libev 支持八种事件类型,当中包含 IO 事件。
一个 IO 事件用 ev_io 来表征,并用 ev_io_init() 函数来初始化:
void ev_io_init(ev_io *io, callback, int fd, int events) |
初始化内容包含回调函数 callback,被探測的句柄 fd 和需要探測的事件。EV_READ 表“可读事件”。EV_WRITE 表“可写事件”。
现在,用户需要作的不过在合适的时候,将某些 ev_io 从 ev_loop 增长或剔除。一旦增长,下个循环即会检查 ev_io 所指定的事件有否发生;假设该事件被探測到,则 ev_loop 会本身主动运行 ev_io 的回调函数 callback();假设 ev_io 被注销。则再也不检測相应事件。
无论某 ev_loop 启动与否,都可以对其加入或删除一个或多个 ev_io,加入删除的接口是 ev_io_start() 和 ev_io_stop()。
void ev_io_start( ev_loop *loop, ev_io* io ) void ev_io_stop( EV_A_* ) |
由此,咱们可以easy得出例如如下的“一问一答”的server模型。由于没有考虑server端主动终止链接机制,因此各个链接可以维持随意时间,client可以自由选择退出时机。
IO事件、定时器事件、信号事件:
#include<ev.h> #include <stdio.h> #include <signal.h> #include <sys/unistd.h> ev_io io_w; ev_timer timer_w; ev_signal signal_w; void io_action(struct ev_loop *main_loop,ev_io *io_w,int e) { int rst; char buf[1024] = {''}; puts("in io cb\n"); read(STDIN_FILENO,buf,sizeof(buf)); buf[1023] = ''; printf("Read in a string %s \n",buf); ev_io_stop(main_loop,io_w); } void timer_action(struct ev_loop *main_loop,ev_timer *timer_w,int e) { puts("in tiemr cb \n"); ev_timer_stop(main_loop,io_w); } void signal_action(struct ev_loop *main_loop,ev_signal signal_w,int e) { puts("in signal cb \n"); ev_signal_stop(main_loop,io_w); ev_break(main_loop,EVBREAK_ALL); } int main(int argc ,char *argv[]) { struct ev_loop *main_loop = ev_default_loop(0); ev_init(&io_w,io_action); ev_io_set(&io_w,STDIN_FILENO,EV_READ); ev_init(&timer_w,timer_action); ev_timer_set(&timer_w,2,0); ev_init(&signal_w,signal_action); ev_signal_set(&signal_w,SIGINT); ev_io_start(main_loop,&io_w); ev_timer_start(main_loop,&timer_w); ev_signal_start(main_loop,&signal_w); ev_run(main_loop,0); return 0; }
这里使用了3种事件监控器,分别监控IO事件、定时器事件以及信号事件。所以定义了3个监控器(watcher),以及触发监控器时要运行动做的回调函数。Libev定义了多种监控器,命名方式为ev_xxx
这里xxx表明监控器类型,事实上现是一个结构体。
typedef struct ev_io { .... } ev_io;
经过宏定义可以简写为 ev_xxx
。
回调函数的类型为 void cb_name(struct ev_loop *main_loop,ev_xxx *io_w,int event)
。
在main中,首先定义了一个事件驱动器的结构 struct ev_loop *main_loop
这里调用 ev_default_loop(0)
生成一个预制的全局驱动器。这里可以參考Manual中的选择。
而后依次初始化各个监控器以及设置监控器的触发条件。
初始化监控器的过程是将对应的回调函数即触发时的动做注冊到监控器上。
设置触发条件则是该条件产生时才去运行注冊到监控器上的动做。对于IO事件,一般是设置特定fd上的的可读或可写事件,定时器则是多久后触发。这里定时器的触发条件中还有第三參数。表示第一次触发后,是否循环。若为0则吧循环,不然按该值循环。信号触发器则是设置触发的信号。
在初始化并设置好触发条件后,先调用ev_xxx_start
将监控器注冊到事件驱动器上。接着调用 ev_run
開始事件驱动器。
上述模型可以接受随意多个链接,且为各个链接提供全然独立的问答服务。借助 libev 提供的事件循环 / 事件驱动接口,上述模型有机会具有其它模型不能提供的高效率、低资源占用、稳定性好和编写简单等特色。
由于传统的 web server。ftp server及其它网络应用程序都具备“一问一答”的通信逻辑。因此上述使用 libev 库的“一问一答”模型对构建相似的server程序具备參考价值;另外,对于需要实现远程监视或远程遥控的应用程序,上述模型相同提供了一个可行的实现方案。
php-了libev扩展socket:<?php /* 使用异步io訪问socket Use some async I/O to access a socket */ // `sockets' extension still logs warnings // for EINPROGRESS, EAGAIN/EWOULDBLOCK etc. error_reporting(E_ERROR); $e_nonblocking = array (/*EAGAIN or EWOULDBLOCK*/11, /*EINPROGRESS*/115); // Get the port for the WWW service $service_port = getservbyname('www', 'tcp'); // Get the IP address for the target host $address = gethostbyname('google.co.uk'); // Create a TCP/IP socket $socket = socket_create(AF_INET, SOCK_STREAM, SOL_TCP); if ($socket === FALSE) { echo \"socket_create() failed: reason: \" .socket_strerror(socket_last_error()) . \"n\"; } // Set O_NONBLOCK flag socket_set_nonblock($socket); // Abort on timeout $timeout_watcher = new EvTimer(10.0, 0., function () use ($socket) { socket_close($socket); Ev::stop(Ev::BREAK_ALL); }); // Make HEAD request when the socket is writable $write_watcher = new EvIo($socket, Ev::WRITE, function ($w) use ($socket, $timeout_watcher, $e_nonblocking) { // Stop timeout watcher $timeout_watcher->stop(); // Stop write watcher $w->stop(); $in = \"HEAD / HTTP/1.1rn\"; $in .= \"Host: google.co.ukrn\"; $in .= \"Connection: Closernrn\"; if (!socket_write($socket, $in, strlen($in))) { trigger_error(\"Failed writing $in to socket\", E_USER_ERROR); } $read_watcher = new EvIo($socket, Ev::READ, function ($w, $re) use ($socket, $e_nonblocking) { // Socket is readable. recv() 20 bytes using non-blocking mode $ret = socket_recv($socket, $out, 20, MSG_DONTWAIT); if ($ret) { echo $out; } elseif ($ret === 0) { // All read $w->stop(); socket_close($socket); return; } // Caught EINPROGRESS, EAGAIN, or EWOULDBLOCK if (in_array(socket_last_error(), $e_nonblocking)) { return; } $w->stop(); socket_close($socket); }); Ev::run(); }); $result = socket_connect($socket, $address, $service_port); Ev::run(); ?>
libevent 和 libev 都提供灵活且强大的环境。支持为处理server端或client请求实现高性能网络(和其它 I/O)接口。
目标是以高效(CPU/RAM 使用量低)的方式支持数千甚至数万个链接。在本文中,您看到了一些演示样例,包含 libevent 中内置的 HTTP 服务,可以使用这些技术支持基于 IBM Cloud、EC2 或 AJAX 的 web 应用程序。
參考:
http://www.ibm.com/developerworks/cn/linux/l-cn-edntwk/
http://www.ibm.com/developerworks/cn/aix/library/au-libev/