GitChat 做者:范蠡
原文:C++ 高性能服务器网络框架设计细节
关注微信公众号:「GitChat 技术杂谈」 一本正经的讲技术前端
【不要错过文末彩蛋】java
这篇文章咱们将介绍服务器的开发,并从多个方面探究如何开发一款高性能高并发的服务器程序。须要注意的是通常大型服务器,其复杂程度在于其业务,而不是在于其代码工程的基本框架。react
大型服务器通常有多个服务组成,可能会支持CDN,或者支持所谓的“分布式”等,这篇文章不会介绍这些东西,由于无论结构多么复杂的服务器,都是由单个服务器组成的。因此这篇文章的侧重点是讨论单个服务程序的结构,并且这里的结构指的也是单个服务器的网络通讯层结构,若是你能真正地理解了我所说的,那么在这个基础的结构上面开展任何业务都是能够的,也能够将这种结构扩展成复杂的多个服务器组,例如“分布式”服务。linux
文中的代码示例虽然是以C++为例,但一样适合Java(我本人也是Java开发者),原理都是同样的,只不过Java可能在基本的操做系统网络通讯API的基础上用虚拟机包裹了一层接口而已(Java甚至可能基于一些经常使用的网络通讯框架思想提供了一些现成的API,例如NIO)。有鉴于此,这篇文章不讨论那些大而空、泛泛而谈的技术术语,而是讲的是实实在在的能指导读者在实际工做中实践的编码方案或优化已有编码的方法。另外这里讨论的技术同时涉及windows和linux两个平台。git
所谓高性能就是服务器能流畅地处理各个客户端的链接并尽可能低延迟地应答客户端的请求;所谓高并发,不只指的是服务器能够同时支持多的客户端链接,并且这些客户端在链接期间内会不断与服务器有数据来往。网络上常常有各类网络库号称单个服务能同时支持百万甚至千万的并发,而后我实际去看了下,结果发现只是能同时支持不少的链接而已。程序员
若是一个服务器能单纯地接受n个链接(n可能很大),可是不能有条不紊地处理与这些链接之间的数据来往也没有任何意义,这种服务器框架只是“玩具型”的,对实际生产和应用没有任何意义。github
这篇文章将从两个方面来介绍,一个是服务器中的基础的网络通讯部件;另一个是,如何利用这些基础通讯部件整合成一个完整的高效的服务器框架。注意:本文如下内容中的客户端是相对概念,指的是链接到当前讨论的服务程序的终端,因此这里的客户端既多是咱们传统意义上的客户端程序,也多是链接该服务的其余服务器程序。面试
###1、网络通讯部件redis
按上面介绍的思路,咱们先从服务程序的网络通讯部件开始介绍。数据库
####须要解决的问题
既然是服务器程序确定会涉及到网络通讯部分,那么服务器程序的网络通讯模块要解决哪些问题?目前,网络上有不少网络通讯框架,如libevent、boost asio、ACE,但都网络通讯的常见的技术手段都大同小异,至少要解决如下问题:
如何检测有新客户端链接?
如何接受客户端链接?
如何检测客户端是否有数据发来?
如何收取客户端发来的数据?
如何检测链接异常?发现链接异常以后,如何处理?
如何给客户端发送数据?
如何在给客户端发完数据后关闭链接?
稍微有点网络基础的人,都能回答上面说的其中几个问题,好比接收客户端链接用socket API的accept函数,收取客户端数据用recv函数,给客户端发送数据用send函数,检测客户端是否有新链接和客户端是否有新数据能够用IO multiplexing技术(IO复用)的select、poll、epoll等socket API。确实是这样的,这些基础的socket API构成了服务器网络通讯的地基,无论网络通讯框架设计的如何巧妙,都是在这些基础的socket API的基础上构建的。可是如何巧妙地组织这些基础的socket API,才是问题的关键。咱们说服务器很高效,支持高并发,实际上只是一个技术实现手段,无论怎样,从软件开发的角度来说无非就是一个程序而已,因此,只要程序能最大可能地知足“尽可能减小等待或者不等待”这一原则就是高效的,也就是说高效不是“忙的忙死,闲的闲死”,而是你们均可以闲着,可是若是有活要干,你们尽可能一块儿干,而不是一部分忙着依次作事情123456789,另一部分闲在那里无所事事。说的可能有点抽象,下面咱们来举一些例子具体来讲明一下。
例如:
默认状况下,recv函数若是没有数据的时候,线程就会阻塞在那里;
默认状况下,send函数,若是tcp窗口不是足够大,数据发不出去也会阻塞在那里;
connect函数默认链接另一端的时候,也会阻塞在那里;
又或者是给对端发送一份数据,须要等待对端回答,若是对方一直不该答,当前线程就阻塞在这里。
以上都不是高效服务器的开发思惟方式,由于上面的例子都不知足“尽可能减小等待”的原则,为何必定要等待呢?有没用一种方法,这些过程不须要等待,最好是不只不须要等待,并且这些事情完成以后能通知我。这样在这些原本用于等待的cpu时间片内,我就能够作一些其余的事情。有,也就是咱们下文要讨论的IO Multiplexing技术(IO复用技术)。
####几种IO复用机制的比较
目前windows系统支持select、WSAAsyncSelect、WSAEventSelect、完成端口(IOCP),linux系统支持select、poll、epoll。这里咱们不具体介绍每一个具体的函数的用法,咱们来讨论一点深层次的东西,以上列举的API函数能够分为两个层次:
层次一: select和poll
层次二: WSAAsyncSelect、WSAEventSelect、完成端口(IOCP)、epoll
为何这么分呢?先来介绍第一层次,select和poll函数本质上仍是在必定时间内主动去查询socket句柄(多是一个也多是多个)上是否有事件,好比可读事件,可写事件或者出错事件,也就是说咱们仍是须要每隔一段时间内去主动去作这些检测,若是在这段时间内检测出一些事件来,咱们这段时间就算没白花,可是假若这段时间内没有事件呢?咱们只能是作无用功了,说白了,仍是在浪费时间,由于假如一个服务器有多个链接,在cpu时间片有限的状况下,咱们花费了必定的时间检测了一部分socket链接,却发现它们什么事件都没有,而在这段时间内咱们却有一些事情须要处理,那咱们为何要花时间去作这个检测呢?把这个时间用在作咱们须要作的事情很差吗?因此对于服务器程序来讲,要想高效,咱们应该尽可能避免花费时间主动去查询一些socket是否有事件,而是等这些socket有事件的时候告诉咱们去处理。这也就是层次二的各个函数作的事情,它们实际至关于变主动查询是否有事件为当有事件时,系统会告诉咱们,此时咱们再去处理,也就是“好钢用在刀刃”上了。只不过层次二的函数通知咱们的方式是各不相同,好比WSAAsyncSelect是利用windows窗口消息队列的事件机制来通知咱们设定的窗口过程函数,IOCP是利用GetQueuedCompletionStatus返回正确的状态,epoll是epoll_wait函数返回而已。
例如,connect函数链接另一端,若是用于链接socket是非阻塞的,那么connect虽然不能马上链接完成,可是也是会马上返回,无需等待,等链接完成以后,WSAAsyncSelect会返回FD_CONNECT
事件告诉咱们链接成功,epoll会产生EPOLLOUT事件,咱们也能知道链接完成。甚至socket有数据可读时,WSAAsyncSelect产生FD_READ事件,epoll产生EPOLLIN事件,等等。因此有了上面的讨论,咱们就能够获得网络通讯检测可读可写或者出错事件的正确姿式。这是我这里提出的第二个原则:尽可能减小作无用功的时间。这个在服务程序资源够用的状况下可能体现不出来什么优点,可是若是有大量的任务要处理,这里就成了性能的一个瓶颈。
####检测网络事件的正确姿式
根据上面的介绍,第一,为了不无心义的等待时间,第二,不采用主动查询各个socket的事件,而是采用等待操做系统通知咱们有事件的状态的策略。咱们的socket都要设置成非阻塞的。在此基础上咱们回到栏目(一)中提到的七个问题:
如何检测有新客户端链接?
如何接受客户端链接?
默认accept函数会阻塞在那里,若是epoll检测到侦听socket上有EPOLLIN事件,或者WSAAsyncSelect检测到有FD_ACCEPT事件,那么就代表此时有新链接到来,这个时候调用accept函数,就不会阻塞了。固然产生的新socket你应该也设置成非阻塞的。这样咱们就能在新socket上收发数据了。
如何检测客户端是否有数据发来?
如何收取客户端发来的数据?
同理,咱们也应该在socket上有可读事件的时候才去收取数据,这样咱们调用recv或者read函数时不用等待,至于一次性收多少数据好呢?咱们能够根据本身的需求来决定,甚至你能够在一个循环里面反复recv或者read,对于非阻塞模式的socket,若是没有数据了,recv或者read也会马上返回,错误码EWOULDBLOCK会代表当前已经没有数据了。示例:
bool CIUSocket::Recv()
{
int nRet = 0;
while(true)
{
char buff[512];
nRet = ::recv(m_hSocket, buff, 512, 0);
if(nRet == SOCKET_ERROR) //一旦出现错误就马上关闭Socket
{
if (::WSAGetLastError() == WSAEWOULDBLOCK)
break;
else
return false;
}
else if(nRet < 1)
return false;
m_strRecvBuf.append(buff, nRet);
::Sleep(1);
}
return true;
}复制代码
如何检测链接异常?发现链接异常以后,如何处理?
一样当咱们收到异常事件后例如EPOLLERR或关闭事件FD_CLOSE,咱们就知道了有异常产生,咱们对异常的处理通常就是关闭对应的socket。另外,若是send/recv或者read/write函数对一个socket进行操做时,若是返回0,那说明对端已经关闭了socket,此时这路链接也不必存在了,咱们也能够关闭对应的socket。
如何给客户端发送数据?
这也是一道常见的网络通讯面试题,某一年的腾讯后台开发职位就问到过这样的问题。给客户端发送数据,比收数据要稍微麻烦一点,也是须要讲点技巧的。首先咱们不能像注册检测数据可读事件同样一开始就注册检测数据可写事件,由于若是检测可写的话,通常状况下只要对端正常收取数据,咱们的socket就都是可写的,若是咱们设置监听可写事件,会致使频繁地触发可写事件,可是咱们此时并不必定有数据须要发送。因此正确的作法是:若是有数据要发送,则先尝试着去发送,若是发送不了或者只发送出去部分,剩下的咱们须要将其缓存起来,而后再设置检测该socket上可写事件,下次可写事件产生时,再继续发送,若是仍是不能彻底发出去,则继续设置侦听可写事件,如此往复,一直到全部数据都发出去为止。一旦全部数据都发出去之后,咱们要移除侦听可写事件,避免无用的可写事件通知。不知道你注意到没有,若是某次只发出去部分数据,剩下的数据应该暂且存起来,这个时候咱们就须要一个缓冲区来存放这部分数据,这个缓冲区咱们称为“发送缓冲区”。发送缓冲区不只存放本次没有发完的数据,还用来存放在发送过程当中,上层又传来的新的须要发送的数据。为了保证顺序,新的数据应该追加在当前剩下的数据的后面,发送的时候从发送缓冲区的头部开始发送。也就是说先来的先发送,后来的后发送。
如何在给客户端发完数据后关闭链接?
这个问题比较难处理,由于这里的“发送完”不必定是真正的发送完,咱们调用send或者write函数即便成功,也只是向操做系统的协议栈里面成功写入数据,至于可否被发出去、什么时候被发出去很难判断,发出去对方是否收到就更难判断了。因此,咱们目前只能简单地认为send或者write返回咱们发出数据的字节数大小,咱们就认为“发完数据”了。而后调用close等socket API关闭链接。固然,你也能够调用shutdown函数来实现所谓的“半关闭”。关于关闭链接的话题,咱们再单独开一个小的标题来专门讨论一下。
####被动关闭链接和主动关闭链接
在实际的应用中,被动关闭链接是因为咱们检测到了链接的异常事件,好比EPOLLERR,或者对端关闭链接,send或recv返回0,这个时候这路链接已经没有存在必要的意义了,咱们被迫关闭链接。
而主动关闭链接,是咱们主动调用close/closesocket来关闭链接。好比客户端给咱们发送非法的数据,好比一些网络攻击的尝试性数据包。这个时候出于安全考虑,咱们关闭socket链接。
####发送缓冲区和接收缓冲区
上面已经介绍了发送缓冲区了,并说明了其存在的意义。接收缓冲区也是同样的道理,当收到数据之后,咱们能够直接进行解包,可是这样并很差,理由一:除非一些约定俗称的协议格式,好比http协议,大多数服务器的业务的协议都是不一样的,也就是说一个数据包里面的数据格式的解读应该是业务层的事情,和网络通讯层应该解耦,为了网络层更加通用,咱们没法知道上层协议长成什么样子,由于不一样的协议格式是不同的,它们与具体的业务有关。理由二:即便知道协议格式,咱们在网络层进行解包处理对应的业务,若是这个业务处理比较耗时,好比须要进行复杂的运算,或者链接数据库进行帐号密码验证,那么咱们的网络线程会须要大量时间来处理这些任务,这样其它网络事件可能无法及时处理。鉴于以上二点,咱们确实须要一个接收缓冲区,将收取到的数据放到该缓冲区里面去,并由专门的业务线程或者业务逻辑去从接收缓冲区中取出数据,并解包处理业务。
说了这么多,那发送缓冲区和接收缓冲区该设计成多大的容量?这是一个老生常谈的问题了,由于咱们常常遇到这样的问题:预分配的内存过小不够用,太大的话可能会形成浪费。怎么办呢?答案就是像string、vector同样,设计出一个能够动态增加的缓冲区,按需分配,不够还能够扩展。
须要特别注意的是,这里说的发送缓冲区和接收缓冲区是每个socket链接都存在一个。这是咱们最多见的设计方案。
####协议的设计
除了一些通用的协议,如http、ftp协议之外,大多数服务器协议都是根据业务制定的。协议设计好了,数据包的格式就根据协议来设置。咱们知道tcp/ip协议是流式数据,因此流式数据就是像流水同样,数据包与数据包之间没有明显的界限。好比A端给B端连续发了三个数据包,每一个数据包都是50个字节,B端可能先收到10个字节,再收到140个字节;或者先收到20个字节,再收到20个字节,再收到110个字节;也可能一次性收到150个字节。这150个字节能够以任何字节数目组合和次数被B收到。因此咱们讨论协议的设计第一个问题就是如何界定包的界限,也就是接收端如何知道每一个包数据的大小。目前经常使用有以下三种方法:
固定大小,这种方法就是假定每个包的大小都是固定字节数目,例如上文中讨论的每一个包大小都是50个字节,接收端每收气50个字节就当成一个包。
指定包结束符,例如以一个\r\n(换行符和回车符)结束,这样对端只要收到这样的结束符,就能够认为收到了一个包,接下来的数据是下一个包的内容。
指定包的大小,这种方法结合了上述两种方法,通常包头是固定大小,包头中有一个字段指定包
体或者整个大的大小,对端收到数据之后先解析包头中的字段获得包体或者整个包的大小,而后根据这个大小去界定数据的界线。
协议要讨论的第二个问题是,设计协议的时候要尽可能方便解包,也就是说协议的格式字段应该尽可能清晰明了。
协议要讨论的第三个问题是,根据协议组装的单个数据包应该尽可能小,注意这里指的是单个数据包,这样有以下好处:第1、对于一些移动端设备来讲,其数据处理能力和带宽能力有限,小的数据不只能加快处理速度,同时节省大量流量费用;第2、若是单个数据包足够小的话,对频繁进行网络通讯的服务器端来讲,能够大大减少其带宽压力,其所在的系统也能使用更少的内存。试想:假如一个股票服务器,若是一只股票的数据包是100个字节或者1000个字节,那一样是10000只股票区别呢?
协议要讨论的第四个问题是,对于数值类型,咱们应该显式地指定数值的长度,好比long型,在32位机器上是32位4个字节,可是若是在64位机器上,就变成了64位8个字节了。这样一样是一个long型,发送方和接收方可能由于机器位数的不一样会用不一样的长度去解码。因此建议最好,在涉及到跨平台使用的协议最好显式地指定协议中整型字段的长度,好比int3二、int64等等。下面是一个协议的接口的例子,固然java程序员应该很熟悉这样的接口:
class BinaryReadStream
{
private:
const char* const ptr;
const size_t len;
const char* cur;
BinaryReadStream(const BinaryReadStream&);
BinaryReadStream& operator=(const BinaryReadStream&);
public:
BinaryReadStream(const char* ptr, size_t len);
virtual const char* GetData() const;
virtual size_t GetSize() const;
bool IsEmpty() const;
bool ReadString(string* str, size_t maxlen, size_t& outlen);
bool ReadCString(char* str, size_t strlen, size_t& len);
bool ReadCCString(const char** str, size_t maxlen, size_t& outlen);
bool ReadInt32(int32_t& i);
bool ReadInt64(int64_t& i);
bool ReadShort(short& i);
bool ReadChar(char& c);
size_t ReadAll(char* szBuffer, size_t iLen) const;
bool IsEnd() const;
const char* GetCurrent() const{ return cur; }
public:
bool ReadLength(size_t & len);
bool ReadLengthWithoutOffset(size_t &headlen, size_t & outlen);
};
class BinaryWriteStream
{
public:
BinaryWriteStream(string* data);
virtual const char* GetData() const;
virtual size_t GetSize() const;
bool WriteCString(const char* str, size_t len);
bool WriteString(const string& str);
bool WriteDouble(double value, bool isNULL = false);
bool WriteInt64(int64_t value, bool isNULL = false);
bool WriteInt32(int32_t i, bool isNULL = false);
bool WriteShort(short i, bool isNULL = false);
bool WriteChar(char c, bool isNULL = false);
size_t GetCurrentPos() const{ return m_data->length(); }
void Flush();
void Clear();
private:
string* m_data;
};复制代码
其中BinaryWriteStream是编码协议的类,BinaryReadStream是解码协议的类。能够按下面这种方式来编码和解码。
编码:
std::string outbuf;
BinaryWriteStream writeStream(&outbuf);
writeStream.WriteInt32(msg_type_register);
writeStream.WriteInt32(m_seq);
writeStream.WriteString(retData);
writeStream.Flush();复制代码
解码:
BinaryReadStream readStream(strMsg.c_str(), strMsg.length());
int32_t cmd;
if (!readStream.ReadInt32(cmd))
{
return false;
}
//int seq;
if (!readStream.ReadInt32(m_seq))
{
return false;
}
std::string data;
size_t datalength;
if (!readStream.ReadString(&data, 0, datalength))
{
return false;
}复制代码
###2、服务器程序结构的组织
上面的六个标题,咱们讨论了不少具体的细节问题,如今是时候讨论将这些细节组织起来了。根据个人我的经验,目前主流的思想是one thread one loop+reactor模式(也有proactor模式)的策略。通俗点说就是一个线程一个循环,即在一个线程的函数里面不断地循环依次作一些事情,这些事情包括检测网络事件、解包数据产生业务逻辑。咱们先从最简单地来讲,设定一些线程在一个循环里面作网络通讯相关的事情,伪码以下:
while(退出标志)
{
//IO复用技术检测socket可读事件、出错事件
//(若是有数据要发送,则也检测可写事件)
//若是有可读事件,对于侦听socket则接收新链接;
//对于普通socket则收取该socket上的数据,收取的数据存入对应的接收缓冲区,若是出错则关闭链接;
//若是有数据要发送,有可写事件,则发送数据
//若是有出错事件,关闭该链接
}复制代码
另外设定一些线程去处理接收到的数据,并解包处理业务逻辑,这些线程能够认为是业务线程了,伪码以下:
//从接收缓冲区中取出数据解包,分解成不一样的业务来处理 复制代码
上面的结构是目前最通用的服务器逻辑结构,可是能不能再简化一下或者说再综合一下呢?咱们试试,你想过这样的问题没有:假如如今的机器有两个cpu(准确的来讲应该是两个核),咱们的网络线程数量是2个,业务逻辑线程也是2个,这样可能存在的状况就是:业务线程运行的时候,网络线程并无运行,它们必须等待,若是是这样的话,干吗要多建两个线程呢?除了程序结构上可能稍微清楚一点,对程序性能没有任何实质性提升,并且白白浪费cpu时间片在线程上下文切换上。因此,咱们能够将网络线程与业务逻辑线程合并,合并后的伪码看起来是这样子的:
while(退出标志)
{
//IO复用技术检测socket可读事件、出错事件
//(若是有数据要发送,则也检测可写事件)
//若是有可读事件,对于侦听socket则接收新链接;
//对于普通socket则收取该socket上的数据,收取的数据存入对应的接收缓冲区,若是出错则关闭链接;
//若是有数据要发送,有可写事件,则发送数据
//若是有出错事件,关闭该链接
//从接收缓冲区中取出数据解包,分解成不一样的业务来处理
}复制代码
你没看错,其实就是简单的合并,合并以后和不只能够达到原来合并前的效果,并且在没有网络IO事件的时候,能够及时处理咱们想处理的一些业务逻辑,而且减小了没必要要的线程上下文切换时间。
咱们再更进一步,甚至咱们能够在这个while循环增长其它的一些任务的处理,好比程序的逻辑任务队列、定时器事件等等,伪码以下:
while(退出标志)
{
//定时器事件处理
//IO复用技术检测socket可读事件、出错事件
//(若是有数据要发送,则也检测可写事件)
//若是有可读事件,对于侦听socket则接收新链接;
//对于普通socket则收取该socket上的数据,收取的数据存入对应的接收缓冲区,若是出错则关闭链接;
//若是有数据要发送,有可写事件,则发送数据
//若是有出错事件,关闭该链接
//从接收缓冲区中取出数据解包,分解成不一样的业务来处理
//程序自定义任务1
//程序自定义任务2
}复制代码
注意:之因此将定时器事件的处理放在网络IO事件的检测以前,是由于避免定时器事件过时时间太长。假如放在后面的话,可能前面的处理耗费了一点时间,等处处理定时器事件时,时间间隔已通过去了很多时间。虽然这样处理,也无法保证定时器事件百分百精确,可是能尽可能保证。固然linux系统下提供eventfd这样的定时器对象,全部的定时器对象就能像处理socket这样的fd同样统一成处理。这也是网络库libevent的思想很像,libevent将socket、定时器、信号封装成统一的对象进行处理。
说了这么多理论性的东西,咱们来一款流行的开源网络库muduo来讲明吧(做者:陈硕),原库是基于boost的,我改为了C++11的版本,并修改了一些bug,在此感谢原做者陈硕。
上文介绍的核心线程函数的while循环位于eventloop.cpp中:
void EventLoop::loop()
{
assert(!looping_);
assertInLoopThread();
looping_ = true;
quit_ = false; // FIXME: what if someone calls quit() before loop() ?
LOG_TRACE << "EventLoop " << this << " start looping";
while (!quit_)
{
activeChannels_.clear();
pollReturnTime_ = poller_->poll(kPollTimeMs, &activeChannels_);
++iteration_;
if (Logger::logLevel() <= Logger::TRACE)
{
printActiveChannels();
}
// TODO sort channel by priority
eventHandling_ = true;
for (ChannelList::iterator it = activeChannels_.begin();
it != activeChannels_.end(); ++it)
{
currentActiveChannel_ = *it;
currentActiveChannel_->handleEvent(pollReturnTime_);
}
currentActiveChannel_ = NULL;
eventHandling_ = false;
doPendingFunctors();
if (frameFunctor_)
{
frameFunctor_();
}
}
LOG_TRACE << "EventLoop " << this << " stop looping";
looping_ = false;
}复制代码
poller_->poll
利用epoll分离网络事件,而后接着处理分离出来的网络事件,每个客户端socket对应一个链接,即一个TcpConnection和Channel通道对象。currentActiveChannel->handleEvent(pollReturnTime)根据是可读、可写、出错事件来调用对应的处理函数,这些函数都是回调函数,程序初始化阶段设置进来的:
void Channel::handleEvent(Timestamp receiveTime)
{
std::shared_ptr<void> guard;
if (tied_)
{
guard = tie_.lock();
if (guard)
{
handleEventWithGuard(receiveTime);
}
}
else
{
handleEventWithGuard(receiveTime);
}
}
void Channel::handleEventWithGuard(Timestamp receiveTime)
{
eventHandling_ = true;
LOG_TRACE << reventsToString();
if ((revents_ & POLLHUP) && !(revents_ & POLLIN))
{
if (logHup_)
{
LOG_WARN << "Channel::handle_event() POLLHUP";
}
if (closeCallback_) closeCallback_();
}
if (revents_ & POLLNVAL)
{
LOG_WARN << "Channel::handle_event() POLLNVAL";
}
if (revents_ & (POLLERR | POLLNVAL))
{
if (errorCallback_) errorCallback_();
}
if (revents_ & (POLLIN | POLLPRI | POLLRDHUP))
{
//当是侦听socket时,readCallback_指向Acceptor::handleRead
//当是客户端socket时,调用TcpConnection::handleRead
if (readCallback_) readCallback_(receiveTime);
}
if (revents_ & POLLOUT)
{
//若是是链接状态服的socket,则writeCallback_指向Connector::handleWrite()
if (writeCallback_) writeCallback_();
}
eventHandling_ = false;
}复制代码
固然,这里利用了Channel对象的“多态性”,若是是普通socket,可读事件就会调用预先设置的回调函数;可是若是是侦听socket,则调用Aceptor对象的handleRead()
来接收新链接:
void Acceptor::handleRead()
{
loop_->assertInLoopThread();
InetAddress peerAddr;
//FIXME loop until no more
int connfd = acceptSocket_.accept(&peerAddr);
if (connfd >= 0)
{
// string hostport = peerAddr.toIpPort();
// LOG_TRACE << "Accepts of " << hostport;
//newConnectionCallback_实际指向TcpServer::newConnection(int sockfd, const InetAddress& peerAddr)
if (newConnectionCallback_)
{
newConnectionCallback_(connfd, peerAddr);
}
else
{
sockets::close(connfd);
}
}
else
{
LOG_SYSERR << "in Acceptor::handleRead";
// Read the section named "The special problem of // accept()ing when you can't" in libev's doc. // By Marc Lehmann, author of livev. if (errno == EMFILE) { ::close(idleFd_); idleFd_ = ::accept(acceptSocket_.fd(), NULL, NULL); ::close(idleFd_); idleFd_ = ::open("/dev/null", O_RDONLY | O_CLOEXEC); } } }复制代码
主循环里面的业务逻辑处理对应:
doPendingFunctors();
if (frameFunctor_)
{
frameFunctor_();
}
[cpp] view plain copy
void EventLoop::doPendingFunctors()
{
std::vector<Functor> functors;
callingPendingFunctors_ = true;
{
std::unique_lock<std::mutex> lock(mutex_);
functors.swap(pendingFunctors_);
}
for (size_t i = 0; i < functors.size(); ++i)
{
functors[i]();
}
callingPendingFunctors_ = false;
}复制代码
这里增长业务逻辑是增长执行任务的函数指针的,增长的任务保存在成员变量pendingFunctors_
中,这个变量是一个函数指针数组(vector对象),执行的时候,调用每一个函数就能够了。上面的代码先利用一个栈变量将成员变量pendingFunctors_
里面的函数指针换过来,接下来对这个栈变量进行操做就能够了,这样减小了锁的粒度。由于成员变量pendingFunctors_
在增长任务的时候,也会被用到,设计到多个线程操做,因此要加锁,增长任务的地方是:
void EventLoop::queueInLoop(const Functor& cb)
{
{
std::unique_lock<std::mutex> lock(mutex_);
pendingFunctors_.push_back(cb);
}
if (!isInLoopThread() || callingPendingFunctors_)
{
wakeup();
}
}复制代码
而frameFunctor_就更简单了,就是经过设置一个函数指针就能够了。固然这里有个技巧性的东西,即增长任务的时候,为了可以当即执行,使用唤醒机制,经过往一个fd里面写入简单的几个字节,来唤醒epoll,使其马上返回,由于此时没有其它的socke有事件,这样接下来就执行刚才添加的任务了。
咱们看一下数据收取的逻辑:
void TcpConnection::handleRead(Timestamp receiveTime)
{
loop_->assertInLoopThread();
int savedErrno = 0;
ssize_t n = inputBuffer_.readFd(channel_->fd(), &savedErrno);
if (n > 0)
{
//messageCallback_指向CTcpSession::OnRead(const std::shared_ptr<TcpConnection>& conn, Buffer* pBuffer, Timestamp receiveTime)
messageCallback_(shared_from_this(), &inputBuffer_, receiveTime);
}
else if (n == 0)
{
handleClose();
}
else
{
errno = savedErrno;
LOG_SYSERR << "TcpConnection::handleRead";
handleError();
}
}复制代码
将收到的数据放到接收缓冲区里面,未来咱们来解包:
void ClientSession::OnRead(const std::shared_ptr<TcpConnection>& conn, Buffer* pBuffer, Timestamp receivTime)
{
while (true)
{
//不够一个包头大小
if (pBuffer->readableBytes() < (size_t)sizeof(msg))
{
LOG_INFO << "buffer is not enough for a package header, pBuffer->readableBytes()=" << pBuffer->readableBytes() << ", sizeof(msg)=" << sizeof(msg);
return;
}
//不够一个整包大小
msg header;
memcpy(&header, pBuffer->peek(), sizeof(msg));
if (pBuffer->readableBytes() < (size_t)header.packagesize + sizeof(msg))
return;
pBuffer->retrieve(sizeof(msg));
std::string inbuf;
inbuf.append(pBuffer->peek(), header.packagesize);
pBuffer->retrieve(header.packagesize);
if (!Process(conn, inbuf.c_str(), inbuf.length()))
{
LOG_WARN << "Process error, close TcpConnection";
conn->forceClose();
}
}// end while-loop
}复制代码
先判断接收缓冲区里面的数据是否够一个包头大小,若是够再判断够不够包头指定的包体大小,若是仍是够的话,接着在Process函数里面处理该包。
再看看发送数据的逻辑:
void TcpConnection::sendInLoop(const void* data, size_t len)
{
loop_->assertInLoopThread();
ssize_t nwrote = 0;
size_t remaining = len;
bool faultError = false;
if (state_ == kDisconnected)
{
LOG_WARN << "disconnected, give up writing";
return;
}
// if no thing in output queue, try writing directly
if (!channel_->isWriting() && outputBuffer_.readableBytes() == 0)
{
nwrote = sockets::write(channel_->fd(), data, len);
if (nwrote >= 0)
{
remaining = len - nwrote;
if (remaining == 0 && writeCompleteCallback_)
{
loop_->queueInLoop(std::bind(writeCompleteCallback_, shared_from_this()));
}
}
else // nwrote < 0
{
nwrote = 0;
if (errno != EWOULDBLOCK)
{
LOG_SYSERR << "TcpConnection::sendInLoop";
if (errno == EPIPE || errno == ECONNRESET) // FIXME: any others?
{
faultError = true;
}
}
}
}
assert(remaining <= len);
if (!faultError && remaining > 0)
{
size_t oldLen = outputBuffer_.readableBytes();
if (oldLen + remaining >= highWaterMark_
&& oldLen < highWaterMark_
&& highWaterMarkCallback_)
{
loop_->queueInLoop(std::bind(highWaterMarkCallback_, shared_from_this(), oldLen + remaining));
}
outputBuffer_.append(static_cast<const char*>(data)+nwrote, remaining);
if (!channel_->isWriting())
{
channel_->enableWriting();
}
}
}复制代码
若是剩余的数据remaining大于则调用channel_->enableWriting();开始监听可写事件,可写事件处理以下:
[cpp] view plain copy
void TcpConnection::handleWrite()
{
loop_->assertInLoopThread();
if (channel_->isWriting())
{
ssize_t n = sockets::write(channel_->fd(),
outputBuffer_.peek(),
outputBuffer_.readableBytes());
if (n > 0)
{
outputBuffer_.retrieve(n);
if (outputBuffer_.readableBytes() == 0)
{
channel_->disableWriting();
if (writeCompleteCallback_)
{
loop_->queueInLoop(std::bind(writeCompleteCallback_, shared_from_this()));
}
if (state_ == kDisconnecting)
{
shutdownInLoop();
}
}
}
else
{
LOG_SYSERR << "TcpConnection::handleWrite";
// if (state_ == kDisconnecting)
// {
// shutdownInLoop();
// }
}
}
else
{
LOG_TRACE << "Connection fd = " << channel_->fd()
<< " is down, no more writing";
}
}复制代码
若是发送完数据之后调用channel_->disableWriting();移除监听可写事件。
不少读者可能一直想问,文中不是说解包数据并处理逻辑是业务代码而非网络通讯的代码,你这里貌似都混在一块儿了,其实没有,这里实际的业务代码处理都是框架曾提供的回调函数里面处理的,具体怎么处理,由框架使用者——业务层本身定义。
总结起来,实际上就是一个线程函数里一个loop那么点事情,不信你再看我曾经工做上的一个交易系统服务器项目代码:
void CEventDispatcher::Run()
{
m_bShouldRun = true;
while(m_bShouldRun)
{
DispatchIOs();
SyncTime();
CheckTimer();
DispatchEvents();
}
}
void CEpollReactor::DispatchIOs()
{
DWORD dwSelectTimeOut = SR_DEFAULT_EPOLL_TIMEOUT;
if (HandleOtherTask())
{
dwSelectTimeOut = 0;
}
struct epoll_event ev;
CEventHandlerIdMap::iterator itor = m_mapEventHandlerId.begin();
for(; itor!=m_mapEventHandlerId.end(); itor++)
{
CEventHandler *pEventHandler = (CEventHandler *)(*itor).first;
if(pEventHandler == NULL){
continue;
}
ev.data.ptr = pEventHandler;
ev.events = 0;
int nReadID, nWriteID;
pEventHandler->GetIds(&nReadID, &nWriteID);
if (nReadID > 0)
{
ev.events |= EPOLLIN;
}
if (nWriteID > 0)
{
ev.events |= EPOLLOUT;
}
epoll_ctl(m_fdEpoll, EPOLL_CTL_MOD, (*itor).second, &ev);
}
struct epoll_event events[EPOLL_MAX_EVENTS];
int nfds = epoll_wait(m_fdEpoll, events, EPOLL_MAX_EVENTS, dwSelectTimeOut/1000);
for (int i=0; i<nfds; i++)
{
struct epoll_event &evref = events[i];
CEventHandler *pEventHandler = (CEventHandler *)evref.data.ptr;
if ((evref.events|EPOLLIN)!=0 && m_mapEventHandlerId.find(pEventHandler)!=m_mapEventHandlerId.end())
{
pEventHandler->HandleInput();
}
if ((evref.events|EPOLLOUT)!=0 && m_mapEventHandlerId.find(pEventHandler)!=m_mapEventHandlerId.end())
{
pEventHandler->HandleOutput();
}
}
}
void CEventDispatcher::DispatchEvents()
{
CEvent event;
CSyncEvent *pSyncEvent;
while(m_queueEvent.PeekEvent(event))
{
int nRetval;
if(event.pEventHandler != NULL)
{
nRetval = event.pEventHandler->HandleEvent(event.nEventID, event.dwParam, event.pParam);
}
else
{
nRetval = HandleEvent(event.nEventID, event.dwParam, event.pParam);
}
if(event.pAdd != NULL) //同步消息
{
pSyncEvent=(CSyncEvent *)event.pAdd;
pSyncEvent->nRetval = nRetval;
pSyncEvent->sem.UnLock();
}
}
}复制代码
再看看蘑菇街开源的TeamTalk的源码(代码下载地址:github.com/baloonwj/Te…
void CEventDispatch::StartDispatch(uint32_t wait_timeout)
{
fd_set read_set, write_set, excep_set;
timeval timeout;
timeout.tv_sec = 0;
timeout.tv_usec = wait_timeout * 1000; // 10 millisecond
if(running)
return;
running = true;
while (running)
{
_CheckTimer();
_CheckLoop();
if (!m_read_set.fd_count && !m_write_set.fd_count && !m_excep_set.fd_count)
{
Sleep(MIN_TIMER_DURATION);
continue;
}
m_lock.lock();
memcpy(&read_set, &m_read_set, sizeof(fd_set));
memcpy(&write_set, &m_write_set, sizeof(fd_set));
memcpy(&excep_set, &m_excep_set, sizeof(fd_set));
m_lock.unlock();
int nfds = select(0, &read_set, &write_set, &excep_set, &timeout);
if (nfds == SOCKET_ERROR)
{
log("select failed, error code: %d", GetLastError());
Sleep(MIN_TIMER_DURATION);
continue; // select again
}
if (nfds == 0)
{
continue;
}
for (u_int i = 0; i < read_set.fd_count; i++)
{
//log("select return read count=%d\n", read_set.fd_count);
SOCKET fd = read_set.fd_array[i];
CBaseSocket* pSocket = FindBaseSocket((net_handle_t)fd);
if (pSocket)
{
pSocket->OnRead();
pSocket->ReleaseRef();
}
}
for (u_int i = 0; i < write_set.fd_count; i++)
{
//log("select return write count=%d\n", write_set.fd_count);
SOCKET fd = write_set.fd_array[i];
CBaseSocket* pSocket = FindBaseSocket((net_handle_t)fd);
if (pSocket)
{
pSocket->OnWrite();
pSocket->ReleaseRef();
}
}
for (u_int i = 0; i < excep_set.fd_count; i++)
{
//log("select return exception count=%d\n", excep_set.fd_count);
SOCKET fd = excep_set.fd_array[i];
CBaseSocket* pSocket = FindBaseSocket((net_handle_t)fd);
if (pSocket)
{
pSocket->OnClose();
pSocket->ReleaseRef();
}
}
}
}复制代码
再看filezilla,一款ftp工具的服务器端,它采用的是Windows的WSAAsyncSelect模型(代码下载地址:
github.com/baloonwj/fi…):
//Processes event notifications sent by the sockets or the layers
static LRESULT CALLBACK WindowProc(HWND hWnd, UINT message, WPARAM wParam, LPARAM lParam)
{
if (message>=WM_SOCKETEX_NOTIFY)
{
//Verify parameters
ASSERT(hWnd);
CAsyncSocketExHelperWindow *pWnd=(CAsyncSocketExHelperWindow *)GetWindowLongPtr(hWnd, GWLP_USERDATA);
ASSERT(pWnd);
if (!pWnd)
return 0;
if (message < static_cast<UINT>(WM_SOCKETEX_NOTIFY+pWnd->m_nWindowDataSize)) //Index is within socket storage
{
//Lookup socket and verify if it's valid CAsyncSocketEx *pSocket=pWnd->m_pAsyncSocketExWindowData[message - WM_SOCKETEX_NOTIFY].m_pSocket; SOCKET hSocket = wParam; if (!pSocket) return 0; if (hSocket == INVALID_SOCKET) return 0; if (pSocket->m_SocketData.hSocket != hSocket) return 0; int nEvent = lParam & 0xFFFF; int nErrorCode = lParam >> 16; //Dispatch notification if (!pSocket->m_pFirstLayer) { //Dispatch to CAsyncSocketEx instance switch (nEvent) { case FD_READ: #ifndef NOSOCKETSTATES if (pSocket->GetState() == connecting && !nErrorCode) { pSocket->m_nPendingEvents |= FD_READ; break; } else if (pSocket->GetState() == attached) pSocket->SetState(connected); if (pSocket->GetState() != connected) break; // Ignore further FD_READ events after FD_CLOSE has been received if (pSocket->m_SocketData.onCloseCalled) break; #endif //NOSOCKETSTATES #ifndef NOSOCKETSTATES if (nErrorCode) pSocket->SetState(aborted); #endif //NOSOCKETSTATES if (pSocket->m_lEvent & FD_READ) { pSocket->OnReceive(nErrorCode); } break; case FD_FORCEREAD: //Forceread does not check if there's data waiting
#ifndef NOSOCKETSTATES
if (pSocket->GetState() == connecting && !nErrorCode)
{
pSocket->m_nPendingEvents |= FD_FORCEREAD;
break;
}
else if (pSocket->GetState() == attached)
pSocket->SetState(connected);
if (pSocket->GetState() != connected)
break;
#endif //NOSOCKETSTATES
if (pSocket->m_lEvent & FD_READ)
{
#ifndef NOSOCKETSTATES
if (nErrorCode)
pSocket->SetState(aborted);
#endif //NOSOCKETSTATES
pSocket->OnReceive(nErrorCode);
}
break;
case FD_WRITE:
#ifndef NOSOCKETSTATES
if (pSocket->GetState() == connecting && !nErrorCode)
{
pSocket->m_nPendingEvents |= FD_WRITE;
break;
}
else if (pSocket->GetState() == attached && !nErrorCode)
pSocket->SetState(connected);
if (pSocket->GetState() != connected)
break;
#endif //NOSOCKETSTATES
if (pSocket->m_lEvent & FD_WRITE)
{
#ifndef NOSOCKETSTATES
if (nErrorCode)
pSocket->SetState(aborted);
#endif //NOSOCKETSTATES
pSocket->OnSend(nErrorCode);
}
break;
case FD_CONNECT:
#ifndef NOSOCKETSTATES
if (pSocket->GetState() == connecting)
{
if (nErrorCode && pSocket->m_SocketData.nextAddr)
{
if (pSocket->TryNextProtocol())
break;
}
pSocket->SetState(connected);
}
else if (pSocket->GetState() == attached && !nErrorCode)
pSocket->SetState(connected);
#endif //NOSOCKETSTATES
if (pSocket->m_lEvent & FD_CONNECT)
pSocket->OnConnect(nErrorCode);
#ifndef NOSOCKETSTATES
if (!nErrorCode)
{
if ((pSocket->m_nPendingEvents&FD_READ) && pSocket->GetState() == connected)
pSocket->OnReceive(0);
if ((pSocket->m_nPendingEvents&FD_FORCEREAD) && pSocket->GetState() == connected)
pSocket->OnReceive(0);
if ((pSocket->m_nPendingEvents&FD_WRITE) && pSocket->GetState() == connected)
pSocket->OnSend(0);
}
pSocket->m_nPendingEvents = 0;
#endif
break;
case FD_ACCEPT:
#ifndef NOSOCKETSTATES
if (pSocket->GetState() != listening && pSocket->GetState() != attached)
break;
#endif //NOSOCKETSTATES
if (pSocket->m_lEvent & FD_ACCEPT)
pSocket->OnAccept(nErrorCode);
break;
case FD_CLOSE:
#ifndef NOSOCKETSTATES
if (pSocket->GetState() != connected && pSocket->GetState() != attached)
break;
// If there are still bytes left to read, call OnReceive instead of
// OnClose and trigger a new OnClose
DWORD nBytes = 0;
if (!nErrorCode && pSocket->IOCtl(FIONREAD, &nBytes))
{
if (nBytes > 0)
{
// Just repeat message.
pSocket->ResendCloseNotify();
pSocket->m_SocketData.onCloseCalled = true;
pSocket->OnReceive(WSAESHUTDOWN);
break;
}
}
pSocket->SetState(nErrorCode ? aborted : closed);
#endif //NOSOCKETSTATES
pSocket->OnClose(nErrorCode);
break;
}
}
else //Dispatch notification to the lowest layer
{
if (nEvent == FD_READ)
{
// Ignore further FD_READ events after FD_CLOSE has been received
if (pSocket->m_SocketData.onCloseCalled)
return 0;
DWORD nBytes;
if (!pSocket->IOCtl(FIONREAD, &nBytes))
nErrorCode = WSAGetLastError();
if (pSocket->m_pLastLayer)
pSocket->m_pLastLayer->CallEvent(nEvent, nErrorCode);
}
else if (nEvent == FD_CLOSE)
{
// If there are still bytes left to read, call OnReceive instead of
// OnClose and trigger a new OnClose
DWORD nBytes = 0;
if (!nErrorCode && pSocket->IOCtl(FIONREAD, &nBytes))
{
if (nBytes > 0)
{
// Just repeat message.
pSocket->ResendCloseNotify();
if (pSocket->m_pLastLayer)
pSocket->m_pLastLayer->CallEvent(FD_READ, 0);
return 0;
}
}
pSocket->m_SocketData.onCloseCalled = true;
if (pSocket->m_pLastLayer)
pSocket->m_pLastLayer->CallEvent(nEvent, nErrorCode);
}
else if (pSocket->m_pLastLayer)
pSocket->m_pLastLayer->CallEvent(nEvent, nErrorCode);
}
}
return 0;
}
else if (message == WM_USER) //Notification event sent by a layer
{
//Verify parameters, lookup socket and notification message
//Verify parameters
ASSERT(hWnd);
CAsyncSocketExHelperWindow *pWnd=(CAsyncSocketExHelperWindow *)GetWindowLongPtr(hWnd, GWLP_USERDATA);
ASSERT(pWnd);
if (!pWnd)
return 0;
if (wParam >= static_cast<UINT>(pWnd->m_nWindowDataSize)) //Index is within socket storage
{
return 0;
}
CAsyncSocketEx *pSocket = pWnd->m_pAsyncSocketExWindowData[wParam].m_pSocket;
CAsyncSocketExLayer::t_LayerNotifyMsg *pMsg = (CAsyncSocketExLayer::t_LayerNotifyMsg *)lParam;
if (!pMsg || !pSocket || pSocket->m_SocketData.hSocket != pMsg->hSocket)
{
delete pMsg;
return 0;
}
int nEvent=pMsg->lEvent&0xFFFF;
int nErrorCode=pMsg->lEvent>>16;
//Dispatch to layer
if (pMsg->pLayer)
pMsg->pLayer->CallEvent(nEvent, nErrorCode);
else
{
//Dispatch to CAsyncSocketEx instance
switch (nEvent)
{
case FD_READ:
#ifndef NOSOCKETSTATES
if (pSocket->GetState() == connecting && !nErrorCode)
{
pSocket->m_nPendingEvents |= FD_READ;
break;
}
else if (pSocket->GetState() == attached && !nErrorCode)
pSocket->SetState(connected);
if (pSocket->GetState() != connected)
break;
#endif //NOSOCKETSTATES
if (pSocket->m_lEvent & FD_READ)
{
#ifndef NOSOCKETSTATES
if (nErrorCode)
pSocket->SetState(aborted);
#endif //NOSOCKETSTATES
pSocket->OnReceive(nErrorCode);
}
break;
case FD_FORCEREAD: //Forceread does not check if there's data waiting #ifndef NOSOCKETSTATES if (pSocket->GetState() == connecting && !nErrorCode) { pSocket->m_nPendingEvents |= FD_FORCEREAD; break; } else if (pSocket->GetState() == attached && !nErrorCode) pSocket->SetState(connected); if (pSocket->GetState() != connected) break; #endif //NOSOCKETSTATES if (pSocket->m_lEvent & FD_READ) { #ifndef NOSOCKETSTATES if (nErrorCode) pSocket->SetState(aborted); #endif //NOSOCKETSTATES pSocket->OnReceive(nErrorCode); } break; case FD_WRITE: #ifndef NOSOCKETSTATES if (pSocket->GetState() == connecting && !nErrorCode) { pSocket->m_nPendingEvents |= FD_WRITE; break; } else if (pSocket->GetState() == attached && !nErrorCode) pSocket->SetState(connected); if (pSocket->GetState() != connected) break; #endif //NOSOCKETSTATES if (pSocket->m_lEvent & FD_WRITE) { #ifndef NOSOCKETSTATES if (nErrorCode) pSocket->SetState(aborted); #endif //NOSOCKETSTATES pSocket->OnSend(nErrorCode); } break; case FD_CONNECT: #ifndef NOSOCKETSTATES if (pSocket->GetState() == connecting) pSocket->SetState(connected); else if (pSocket->GetState() == attached && !nErrorCode) pSocket->SetState(connected); #endif //NOSOCKETSTATES if (pSocket->m_lEvent & FD_CONNECT) pSocket->OnConnect(nErrorCode); #ifndef NOSOCKETSTATES if (!nErrorCode) { if (((pSocket->m_nPendingEvents&FD_READ) && pSocket->GetState() == connected) && (pSocket->m_lEvent & FD_READ)) pSocket->OnReceive(0); if (((pSocket->m_nPendingEvents&FD_FORCEREAD) && pSocket->GetState() == connected) && (pSocket->m_lEvent & FD_READ)) pSocket->OnReceive(0); if (((pSocket->m_nPendingEvents&FD_WRITE) && pSocket->GetState() == connected) && (pSocket->m_lEvent & FD_WRITE)) pSocket->OnSend(0); } pSocket->m_nPendingEvents = 0; #endif //NOSOCKETSTATES break; case FD_ACCEPT: #ifndef NOSOCKETSTATES if ((pSocket->GetState() == listening || pSocket->GetState() == attached) && (pSocket->m_lEvent & FD_ACCEPT)) #endif //NOSOCKETSTATES { pSocket->OnAccept(nErrorCode); } break; case FD_CLOSE: #ifndef NOSOCKETSTATES if ((pSocket->GetState() == connected || pSocket->GetState() == attached) && (pSocket->m_lEvent & FD_CLOSE)) { pSocket->SetState(nErrorCode?aborted:closed); #else { #endif //NOSOCKETSTATES pSocket->OnClose(nErrorCode); } break; } } delete pMsg; return 0; } else if (message == WM_USER+1) { // WSAAsyncGetHostByName reply // Verify parameters ASSERT(hWnd); CAsyncSocketExHelperWindow *pWnd = (CAsyncSocketExHelperWindow *)GetWindowLongPtr(hWnd, GWLP_USERDATA); ASSERT(pWnd); if (!pWnd) return 0; CAsyncSocketEx *pSocket = NULL; for (int i = 0; i < pWnd->m_nWindowDataSize; ++i) { pSocket = pWnd->m_pAsyncSocketExWindowData[i].m_pSocket; if (pSocket && pSocket->m_hAsyncGetHostByNameHandle && pSocket->m_hAsyncGetHostByNameHandle == (HANDLE)wParam && pSocket->m_pAsyncGetHostByNameBuffer) break; } if (!pSocket || !pSocket->m_pAsyncGetHostByNameBuffer) return 0; int nErrorCode = lParam >> 16; if (nErrorCode) { pSocket->OnConnect(nErrorCode); return 0; } SOCKADDR_IN sockAddr{}; sockAddr.sin_family = AF_INET; sockAddr.sin_addr.s_addr = ((LPIN_ADDR)((LPHOSTENT)pSocket->m_pAsyncGetHostByNameBuffer)->h_addr)->s_addr; sockAddr.sin_port = htons(pSocket->m_nAsyncGetHostByNamePort); BOOL res = pSocket->Connect((SOCKADDR*)&sockAddr, sizeof(sockAddr)); delete [] pSocket->m_pAsyncGetHostByNameBuffer; pSocket->m_pAsyncGetHostByNameBuffer = 0; pSocket->m_hAsyncGetHostByNameHandle = 0; if (!res) if (GetLastError() != WSAEWOULDBLOCK) pSocket->OnConnect(GetLastError()); return 0; } else if (message == WM_USER + 2) { //Verify parameters, lookup socket and notification message //Verify parameters if (!hWnd) return 0; CAsyncSocketExHelperWindow *pWnd=(CAsyncSocketExHelperWindow *)GetWindowLongPtr(hWnd, GWLP_USERDATA); if (!pWnd) return 0; if (wParam >= static_cast<UINT>(pWnd->m_nWindowDataSize)) //Index is within socket storage return 0; CAsyncSocketEx *pSocket = pWnd->m_pAsyncSocketExWindowData[wParam].m_pSocket; if (!pSocket) return 0; // Process pending callbacks std::list<t_callbackMsg> tmp; tmp.swap(pSocket->m_pendingCallbacks); pSocket->OnLayerCallback(tmp); for (auto & cb : tmp) { delete [] cb.str; } } else if (message == WM_TIMER) { if (wParam != 1) return 0; ASSERT(hWnd); CAsyncSocketExHelperWindow *pWnd=(CAsyncSocketExHelperWindow *)GetWindowLongPtr(hWnd, GWLP_USERDATA); ASSERT(pWnd && pWnd->m_pThreadData); if (!pWnd || !pWnd->m_pThreadData) return 0; if (pWnd->m_pThreadData->layerCloseNotify.empty()) { KillTimer(hWnd, 1); return 0; } CAsyncSocketEx* socket = pWnd->m_pThreadData->layerCloseNotify.front(); pWnd->m_pThreadData->layerCloseNotify.pop_front(); if (pWnd->m_pThreadData->layerCloseNotify.empty()) KillTimer(hWnd, 1); if (socket) PostMessage(hWnd, socket->m_SocketData.nSocketIndex + WM_SOCKETEX_NOTIFY, socket->m_SocketData.hSocket, FD_CLOSE); return 0; } return DefWindowProc(hWnd, message, wParam, lParam); }复制代码
上面截取的代码段,若是你对这些项目不是很熟悉的话,估计你也没有任何兴趣去细细看每一行代码逻辑。可是你必定要明白我所说的这个结构的逻辑,基本上目前主流的网络框架都是这套原理。好比filezilla的网络通讯层一样也被用在大名鼎鼎的电驴(easyMule)中。
关于单个服务程序的框架,我已经介绍完了,若是你能彻底理解我要表达的意思,我相信你也能构建出一套高性能服务程序来。
另外,服务器框架也能够在上面的设计思路的基础上增长不少有意思的细节,好比流量控制。举另外 一个我实际作过的项目中的例子吧:
通常实际项目中,当客户端链接数目比较多的时候,服务器在处理网络数据的时候,若是同时有多个socket上有数据要处理,因为cpu核数有限,根据上面先检测iO事件再处理IO事件可能会出现工做线程一直处理前几个socket的事件,直到前几个socket处理完毕后再处理后面几个socket的数据。这就至关于,你去饭店吃饭,你们都点了菜,可是有些桌子上一直在上菜,而有些桌子上一直没有菜。这样确定很差,咱们来看下如何避免这种现象:
int CFtdEngine::HandlePackage(CFTDCPackage *pFTDCPackage, CFTDCSession *pSession)
{
//NET_IO_LOG0("CFtdEngine::HandlePackage\n");
FTDC_PACKAGE_DEBUG(pFTDCPackage);
if (pFTDCPackage->GetTID() != FTD_TID_ReqUserLogin)
{
if (!IsSessionLogin(pSession->GetSessionID()))
{
SendErrorRsp(pFTDCPackage, pSession, 1, "客户未登陆");
return 0;
}
}
CalcFlux(pSession, pFTDCPackage->Length()); //统计流量
REPORT_EVENT(LOG_DEBUG, "Front/Fgateway", "登陆请求%0x", pFTDCPackage->GetTID());
int nRet = 0;
switch(pFTDCPackage->GetTID())
{
case FTD_TID_ReqUserLogin:
///huwp:20070608:检查太高版本的API将被禁止登陆
if (pFTDCPackage->GetVersion()>FTD_VERSION)
{
SendErrorRsp(pFTDCPackage, pSession, 1, "Too High FTD Version");
return 0;
}
nRet = OnReqUserLogin(pFTDCPackage, (CFTDCSession *)pSession);
FTDRequestIndex.incValue();
break;
case FTD_TID_ReqCheckUserLogin:
nRet = OnReqCheckUserLogin(pFTDCPackage, (CFTDCSession *)pSession);
FTDRequestIndex.incValue();
break;
case FTD_TID_ReqSubscribeTopic:
nRet = OnReqSubscribeTopic(pFTDCPackage, (CFTDCSession *)pSession);
FTDRequestIndex.incValue();
break;
}
return 0;
}复制代码
当有某个socket上有数据可读时,接着接收该socket上的数据,对接收到的数据进行解包,而后调用CalcFlux(pSession, pFTDCPackage->Length())进行流量统计:
void CFrontEngine::CalcFlux(CSession *pSession, const int nFlux)
{
TFrontSessionInfo *pSessionInfo = m_mapSessionInfo.Find(pSession->GetSessionID());
if (pSessionInfo != NULL)
{
//流量控制改成计数
pSessionInfo->nCommFlux ++;
///若流量超过规定,则挂起该会话的读操做
if (pSessionInfo->nCommFlux >= pSessionInfo->nMaxCommFlux)
{
pSession->SuspendRead(true);
}
}
}复制代码
该函数会先让某个链接会话(Session)处理的包数量递增,接着判断是否超过最大包数量,则设置读挂起标志:
void CSession::SuspendRead(bool bSuspend)
{
m_bSuspendRead = bSuspend;
}复制代码
这样下次将会从检测的socket列表中排除该socket:
void CEpollReactor::RegisterIO(CEventHandler *pEventHandler)
{
int nReadID, nWriteID;
pEventHandler->GetIds(&nReadID, &nWriteID);
if (nWriteID != 0 && nReadID ==0)
{
nReadID = nWriteID;
}
if (nReadID != 0)
{
m_mapEventHandlerId[pEventHandler] = nReadID;
struct epoll_event ev;
ev.data.ptr = pEventHandler;
if(epoll_ctl(m_fdEpoll, EPOLL_CTL_ADD, nReadID, &ev) != 0)
{
perror("epoll_ctl EPOLL_CTL_ADD");
}
}
}
void CSession::GetIds(int *pReadId, int *pWriteId)
{
m_pChannelProtocol->GetIds(pReadId,pWriteId);
if (m_bSuspendRead)
{
*pReadId = 0;
}
}复制代码
也就是说再也不检测该socket上是否有数据可读。而后在定时器里1秒后重置该标志,这样这个socket上有数据的话又能够从新检测到了:
const int SESSION_CHECK_TIMER_ID = 9;
const int SESSION_CHECK_INTERVAL = 1000;
SetTimer(SESSION_CHECK_TIMER_ID, SESSION_CHECK_INTERVAL);
void CFrontEngine::OnTimer(int nIDEvent)
{
if (nIDEvent == SESSION_CHECK_TIMER_ID)
{
CSessionMap::iterator itor = m_mapSession.Begin();
while (!itor.IsEnd())
{
TFrontSessionInfo *pFind = m_mapSessionInfo.Find((*itor)->GetSessionID());
if (pFind != NULL)
{
CheckSession(*itor, pFind);
}
itor++;
}
}
}
void CFrontEngine::CheckSession(CSession *pSession, TFrontSessionInfo *pSessionInfo)
{
///从新开始计算流量
pSessionInfo->nCommFlux -= pSessionInfo->nMaxCommFlux;
if (pSessionInfo->nCommFlux < 0)
{
pSessionInfo->nCommFlux = 0;
}
///若流量超过规定,则挂起该会话的读操做
pSession->SuspendRead(pSessionInfo->nCommFlux >= pSessionInfo->nMaxCommFlux);
}复制代码
这就至关与饭店里面先给某一桌客人上一些菜,让他们先吃着,等上了一些菜以后不会再给这桌继续上菜了,而是给其它空桌上菜,你们都吃上后,继续回来给原先的桌子继续上菜。实际上咱们的饭店都是这么作的。上面的例子是单服务流量控制的实现的一个很是好的思路,它保证了每一个客户端都能均衡地获得服务,而不是一些客户端等好久才有响应。固然,这样的技术不能适用于有顺序要求的业务,例如销售系统,这些系统通常是先下单先获得的。
另外如今的服务器为了加快IO操做,大量使用缓存技术,缓存其实是以空间换取时间的策略。对于一些反复使用的,可是不常常改变的信息,若是从原始地点加载这些信息就比较耗时的数据(好比从磁盘中、从数据库中),咱们就可使用缓存。因此时下像redis、leveldb、fastdb等各类内存数据库大行其道。若是你要从事服务器开发,你至少须要掌握它们中的几种。
这是我在gitchat上的首篇文章,限于篇幅有限,不少细节不可能展开来叙述,同时这里就再也不讲述分布式的服务器的设计技巧了,后面若是条件容许会给你们带来更多的技术分享。同时感谢gitchat提供这样一个与你们交流的平台。
鉴于笔者能力和经验有限,文中不免有错漏之处,欢迎提意见。