彻底靠内核驱动,只要某个文件描述符有变化,就会一直通知咱们的应用程序,直处处理完毕为止。ios
此种状况下,当文件描述符有变化时,epoll只会通知应用程序一次,并将描述符从监视队列中清除,直到应用程序处理完该次变化,若是应用程序没有处理,那epoll就不会再次关注该文件描述符,这样其实可能会形成丢包的。 这时应用程序须要本身维护一张fds的表格,把从epoll_wait获得的状态信息登记到这张表格,而后应用程序能够选择遍历这张表格,对处于忙碌状态的fds进行操做。程序员
ET比LT对应用程序的要求更多,须要程序员设计的部分也更多,看上去LT好像要简单不少,可是当咱们要求对fd有超时控制时,LT也一样须要对fds进行遍历,此时不如使用原本就要遍历的ET。 并且因为epoll_wait每次返回的fds数量是有限的,在大并发的模式下,LT将很是的繁忙,全部的fds都要在它的队列中产生状态信息,而每次只有一部分fds能返回给应用程序。 而ET只要epoll_wait返回一次fds以后,这些fds就会从队列中删除,只有当fd从新变为空闲状态时才从新加入到队列中,这就是说,随着epoll_wait的返回,队列中的fds是在减小的,这样在大并发的状况下,ET模式将会更加具备优点。编程
int epoll_create(int size); //建立一个epoll的句柄,size用来告诉内核要监听的数目
复制代码
返回值: >0 返回建立成功的epoll句柄 -1 失败数组
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
复制代码
epoll的事件注册函数, 注册要监听的事件类型: 参数说明:bash
struct epoll_event结构以下:服务器
struct epoll_event {
__uint32_t events; //多个宏的集合,表示对应文件描述符可读、可写、紧急可读等等
epoll_data_t data; //一个联合体,详细介绍见下面
};
typedef union epoll_data
{
void *ptr;
int fd;
uint32_t u32;
uint64_t u64;
}epoll_data_t;
复制代码
int epoll_wait(int epfd, struct epoll_event* events, int maxevents, int timeout);
复制代码
参数说明以下:网络
功能说明: 等侍注册在epfd(epoll生成的文件描述符)上的socket fd的事件的发生,若是发生则将发生的sokct fd和事件类型放入到events数组中。 并 且将注册在epfd上的socket fd的事件类型给清空,因此若是下一个循环你还要关注这个socket fd的话,则须要用epoll_ctl(epfd,EPOLL_CTL_MOD,listenfd,&ev)来从新设置socket fd的事件类型。这时不用EPOLL_CTL_ADD,由于socket fd并未清空,只是事件类型清空。这一步很是重要。当epoll_wait返回时根据返回值(大于0)调用accept。并发
socket/bind/listen/epoll_create/epoll_ctl/epoll_wait/accept/read/write/close异步
首先对CTCP类作一下补充,将socket设置为非阻塞:socket
int CTcp::SetNoblock (int nSock)
{
assert (m_nSock != -1);
int nFlags;
if ( nSock == -1 )
{
nSock = m_nSock;
}
if ((nFlags = fcntl (nSock, F_GETFL, 0)) < 0)
return 0;
nFlags = nFlags | O_NONBLOCK;
if (fcntl (nSock, F_SETFL, nFlags) < 0)
return 0;
return 1;
}
复制代码
而后基于CTCP类,实现CEpollServer类,代码以下:
//EpollServer.h
#ifndef __EPOLL_SERVER_H__
#define __EPOLL_SERVER_H__
#include "SxTcp.h"
//Tcp类
class CEpollServer {
//构造函数
public:
CEpollServer ();
virtual ~CEpollServer ();
//公有成员函数
public:
int CreateEpoll(const char* szIp, int nPort, int nSize);
int ProcessEpoll();
int CloseEpoll();
//私有成员变量
private:
CTcp m_cTcp;
int m_nEpollFd;
};
#endif
复制代码
#include "EpollServer.h"
#include <sys/epoll.h>
#include "TypeError.h"
#include <assert.h>
#include <string.h>
#include <unistd.h>
#include <errno.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <stdio.h>
CEpollServer::CEpollServer ()
{
m_nEpollFd = -1;
}
CEpollServer::~CEpollServer ()
{
CloseEpoll();
m_cTcp.Close();
}
/*建立epoll句柄 入参: szIp 服务器ip地址 nPort 要绑定的端口 nSize 要监听的文件描述符数量 出参:1: 成功 ; 0: 失败 */
int CEpollServer::CreateEpoll(const char* szIp, int nPort, int nSize)
{
assert(szIp != nullptr);
int iRet = 0;
int size = (nSize > 0 ? nSize : DEFAULT_EPOLL_FD_NUM);
iRet = m_cTcp.Open();
if ( iRet == 0 )
{
return SOCKET_ERROR;
}
iRet = m_cTcp.Bind(szIp, nPort);
if ( iRet == 0 )
{
return BIND_ERROR;
}
iRet = m_cTcp.SetNoblock();
if ( iRet == 0 )
{
return SETSOCKOPT_ERROR;
}
iRet = m_cTcp.Listen(nSize+1);//监听描述符数量要比epoll的多?
if ( iRet == 0)
{
return LISTEN_ERROR;
}
if ( m_nEpollFd != -1 )
{
CloseEpoll();
}
m_nEpollFd = epoll_create(size);
if ( m_nEpollFd == -1)
{
return EPOLL_CREATE_ERROR;
}
return 1;
}
/*处理epoll事件 出参:1: 成功 ; 0: 失败 */
int CEpollServer::ProcessEpoll()
{
assert(m_nEpollFd != -1);
int nFds = 0;
int connFd = -1, readFd = -1, writeFd = -1;
int n = 0, nSize = 0;
int nListenFd = -1;
char buf[MAX_READ_SIZE] = {0};
struct sockaddr_in clientAddr;
socklen_t clilen;
struct epoll_event ev, events[20];
memset((void*)&ev, 0, sizeof(ev));
nListenFd = m_cTcp.GetHandle();
ev.data.fd = nListenFd;
ev.events = EPOLLIN|EPOLLET;
if ( epoll_ctl(m_nEpollFd, EPOLL_CTL_ADD, nListenFd, &ev) == -1 )
{
return EPOLL_CTL_ERROR;
}
while(1)
{
n = 0;
nSize = 0;
nFds = epoll_wait(m_nEpollFd, events, 20, 500);
for (int i = 0; i< nFds; ++i)
{
memset(buf, 0, MAX_READ_SIZE);
if (events[i].data.fd == nListenFd )
{
while ( (connFd = accept(nListenFd, (sockaddr*)&clientAddr, &clilen)) > 0 )
{
m_cTcp.SetNoblock(connFd); //ET模式需设置为非阻塞的
ev.data.fd = connFd;
ev.events = EPOLLIN|EPOLLET;
if ( epoll_ctl(m_nEpollFd, EPOLL_CTL_ADD, connFd, &ev) == -1 )
{
return EPOLL_CTL_ERROR;
}
}
if ( connFd == -1 && errno != EAGAIN && errno != ECONNABORTED && errno != EPROTO && errno != EINTR )
{
return ACCEPT_ERROR;
}
continue;
}
else if(events[i].events & EPOLLIN)
{
readFd = events[i].data.fd;
if (readFd < 0)
{
continue;
}
//读取数据
while ( (nSize = read(readFd, buf+n, MAX_READ_SIZE - 1)) > 0 )
{
n += nSize;
}
//EAGAIN说明读到结尾了
if (nSize == -1 && errno != EAGAIN )
{
fprintf(stderr, "epoll read failed\n");
//ngleLog::WriteLog(ERROR, "%s", "epoll read fialed");
}
fprintf(stdout, "read data is:%s\n", buf);
ev.data.fd = readFd;
ev.events = EPOLLOUT|EPOLLET;//边沿模式(ET)
epoll_ctl(m_nEpollFd, EPOLL_CTL_MOD, readFd, &ev);
}
else if(events[i].events & EPOLLOUT)
{
writeFd = events[i].data.fd;
//写数据
strncpy(buf, "hello client", sizeof(buf)-1);
int dataSize = strlen(buf);
n = dataSize;
while(n > 0)
{
nSize = write(writeFd, buf + dataSize - n, n);
if (nSize < n)
{
if (nSize == -1 && errno != EAGAIN)
{
break;
}
}
n -= nSize;
}
ev.data.fd = writeFd;
ev.events = EPOLLIN|EPOLLET;
epoll_ctl(m_nEpollFd, EPOLL_CTL_MOD, writeFd, &ev);
}
}
}
}
/* 关闭epoll文件描述符 */
int CEpollServer::CloseEpoll()
{
if (m_nEpollFd != -1)
{
close (m_nEpollFd);
m_nEpollFd = -1;
}
return 1;
}
复制代码
将上面CEpollServer类和TCP类结合起来编译成一个动态库,makefile以下:
LIB_DIR=./lib
src=$(wildcard *.cpp)
obj=$(patsubst %.cpp,%.o,$(src))
PIC=-fPIC
LIBSO=-shared
#CC=g++ -gdwarf-2 -gstrict-dwarf
CC=g++ -g
%.o:%.cpp
$(CC) -c $< $(PIC)
network:$(obj)
$(CC) -o libnetwork.so $^ $(LIBSO)
cp -f libnetwork.so ../test/lib
clean:
rm -f *.o *.so
复制代码
而后实现TestEpollServer.cpp,以下: 注意:下面ConfigIni和SingleLog都是我本人测试时候写的库,如需使用下面代码,须要修改!
#include "../../readini/ConfigIni.h"
#include <string>
#include "../../network/EpollServer.h"
#include "../../log/SingleLog.h"
CEpollServer g_clEpollServer;
#define FILEDIR "./socket.ini"
//epoll server
int epoll_server_init() {
int iRet = -1;
string strIp;
int nPort = 0, nEpollNum = 0, nTimeout = 0;
ConfigIni::Init(string(FILEDIR));
strIp = ConfigIni::ReadStr(string("SERVER"), string("Addr"));
if (strIp == "")
{
SingleLog::WriteLog(ERROR,"read server addr failed");
return iRet;
}
nPort = ConfigIni::ReadInt(string("SERVER"), string("Port"));
if ( nPort == -1 )
{
SingleLog::WriteLog(ERROR,"read server port failed");
return iRet;
}
nEpollNum = ConfigIni::ReadInt(string("SERVER"), string("MaxEpollNum"));
if ( nEpollNum == -1 )
{
SingleLog::WriteLog(ERROR,"read server epoll num failed");
return iRet;
}
nTimeout = ConfigIni::ReadInt(string("SERVER"), string("Timeout"));
if ( nTimeout == -1 )
{
SingleLog::WriteLog(ERROR,"read server timeout failed");
return iRet;
}
iRet = g_clEpollServer.CreateEpoll(strIp.c_str(), nPort, nEpollNum);
if ( iRet == 0 )
{
SingleLog::WriteLog(ERROR, "epoll create failed");
return -1;
}
return 0;
}
void epoll_server_run() {
g_clEpollServer.ProcessEpoll();
}
int main() {
SingleLog::Init();
if (epoll_server_init() == -1)
{
return -1;
}
epoll_server_run();
return 0;
}
复制代码
//TestClient.cpp
#include <stdio.h>
#include <iostream>
#include <string.h>
#include "../../network/SxTcp.h"
using namespace std;
int main() {
CTcp tcp;
int iRet = 0;
int iFd = 0;
char buf[128] = {0};
iRet = tcp.Open();
if (iRet == 0)
{
perror("socket create failed");
return -1;
}
iRet = tcp.Connect("192.168.233.250", 6666);
if (iRet == 0)
{
perror("socket connect failed");
return -1;
}
while(1)
{
memset(buf, 0, sizeof(buf));
cout << "please input some string:";
cin >> buf;
iRet = tcp.Send(buf, strlen(buf));
if (iRet < -1 && errno != EAGAIN)
{
perror("send failed");
return -1;
}
else if(iRet == 0)
{
perror("connect is closed");
return -1;
}
memset(buf, 0, sizeof(buf));
iRet = tcp.Recv(buf, sizeof(buf));
if (iRet < 0 && errno != EAGAIN)
{
perror("recv failed");
return -1;
}
else if(iRet == 0)
{
perror("socket not connect");
return -1;
}
fprintf(stdout, "recv data is:%s\n", buf);
}
return 0;
}
复制代码
分别编译TestEpollServer.cpp和TestClient.cpp,生成服务端和客户端应用程序,便可实现通讯。