【原创】基于ACE Proactor框架下高并发、大容量吞吐程序设计既最近的一个产品开发总结

ReactorProactor

基本概念

在高性能的I/O设计中,有两个比较著名的模式ReactorProactor模式,其中Reactor模式用于同步I/O,而Proactor运用于异步I/O操做。程序员

       在比较这两个模式以前,咱们首先的搞明白几个概念,编程

    • 什么是阻塞和非阻塞

阻塞和非阻塞是针对于进程在访问数据的时候,根据IO操做的就绪状态来采起的不一样方式,说白了是一种读取或者写入操做函数的实现方式。windows

阻塞方式下读取或者写入函数将一直等待。设计模式

非阻塞方式下,读取或者写入函数会当即返回一个状态值。缓存

    • 什么是同步和异步

同步和异步是针对应用程序和内核的交互而言的。服务器

同步指的是用户进程触发IO操做并等待或者轮询的去查看IO操做是否就绪。网络

异步是指用户进程触发IO操做之后便开始作本身的事情,而当IO操做已经完成的时候会获得IO完成的通知。数据结构

通常来讲I/O模型能够分为:同步阻塞,同步非阻塞,异步阻塞,异步非阻塞多线程

让咱们来看一下每种不一样I/O模型的具体描述并发

    •    同步阻塞 IO

   在此种方式下,用户进程在发起一个IO操做之后,必须等待IO操做的完成,只有当真正完成了IO操做之后,用户进程才能运行。JAVA传统的IO模型属于此种方式!

    •    同步非阻塞IO:

在此种方式下,用户进程发起一个IO操做之后边可返回作其它事情,可是用户进程须要时不时的询问IO操做是否就绪,这就要求用户进程不停的去询问,从而引入没必要要的CPU资源浪费。其中目前JAVANIO就属于同步非阻塞IO

    •    异步阻塞IO

此种方式下,应用发起一个IO操做之后,不等待内核IO操做的完成,等内核完成IO操做之后会通知应用程序,这其实就是同步和异步最关键的区别,同步必须等待或者主动的去询问IO是否完成,那么为何说是阻塞的呢?由于此时是经过select系统调用来完成的,而select函数自己的实现方式是阻塞的,而采用select函数有个好处就是它能够同时监听多个文件句柄,从而提升系统的并发性!

    •    异步非阻塞IO:

此种方式下,用户进程只须要发起一个IO操做而后当即返回,等IO操做真正的完成之后,应用程序会获得IO操做完成的通知,此时用户进程只须要对数据进行处理就行了,不须要进行实际的IO读写操做,由于真正的IO读取或者写入操做已经由内核完成了。目前Java中尚未支持此种IO模型。   

阻塞型I/O意味着控制权只有到调用操做结束后才会回到调用者手里.结果调用者被阻塞了,这段时间了作不了任何其它事情。 更郁闷的是,在等待IO结果的时间里,调用者所在线程此时没法腾出手来去响应其它的请求,这真是太浪费资源了。拿read() 操做来讲吧,调用此函数的代码会一直僵在此处直至它所读的socket缓存中有数据到来。

相比之下,非阻塞同步是会当即返回控制权给调用者的。调用者不须要等等,它从调用的函数获取两种结果:要么这次调用成功进行了;要么系统返回错误标识告诉调用者当前资源不可用,你再等等或者再试一次看吧。好比read()操做,若是当前socket无数据可读,则当即返回EWOULBLOCK/EAGAIN,告诉调用read()"数据还没准备好,你稍后再试".

在非阻塞异步调用中,稍有不一样。调用函数在当即返回时,还告诉调用者,此次请求已经开始了。系统会使用另外的资源或者线程来完成此次调用操做,并在完成的时候知会调用者(好比经过回调函数)。拿WindowsReadFile()或者POSIXaio_read()来讲,调用它以后,函数当即返回,操做系统在后台同时开始读操做。

在以上三种IO形式中,非阻塞异步是性能最高、伸缩性最好的。搞清楚了以上概念之后,咱们再回过头来看看,Reactor模式和Proactor模式。

此文详细的阐述了基于TCP高性能的GOLDEN数据服务器模块的设计以及解决方案 ,咱们在文章的后面就再也不说起阻塞式的方案了,由于阻塞式I/O实在是缺乏可伸缩性,性能也达不到高性能服务器的要求。

两种IO多路复用方案:ReactorProactor

通常状况下,I/O复用机制须要事件分离器(event demultiplexor ). 事件分离器的做用,就是将那些读写事件源分发给各读写事件的处理者,就像送快递的在楼下喊:谁的什么东西送了,快来拿吧。开发人员在开始的时候须要在事件分离器那里注册感兴趣的事件,并提供相应的事件处理器(event handlers),或者是回调函数;事件分离器在适当的时候会将请求的事件分发给这些handler或者回调函数。

涉及到事件分离器的两种模式称为:ReactorProactorReactor模式是基于同步I/O的,而Proactor模式是和异步I/O相关的。在Reactor模式中,事件分离者等待某个事件或者是应用或者是某个操做的状态发生(好比文件描述符可读写,或者是socket可读写),事件分离者就把这个事件传给事先注册的事件处理器或者事件处理函数或者回调函数,由后者来作实际的读写操做。

而在Proactor模式中,事件处理器(或者由事件分离器代为)直接发起一个异步读写操做(至关于请求)而实际的工做是由操做系统来完成的。发起时,须要提供的参数包括用于存放读到数据的缓存区,读的数据大小,或者用于存放外发数据的缓存区,以及这个请求完后的回调函数等信息。事件分离器得知了这个请求,它默默等待这个请求的完成,而后转发完成事件给相应的事件处理器或者事件处理函数或者回调。举例来讲,在Windows上事件处理器投递了一个异步IO操做(称有overlapped的技术),事件分离器等IOCompletion事件完成 ,这种异步模式的典型实现是基于操做系统底层异步API的,因此咱们可称之为“系统级别”的或者“真正意义上”的异步,由于具体的读写是由操做系统代劳的。

举另外个例子来更好地理解ReactorProactor两种模式的区别。这里咱们只关注read操做,由于write操做也是差很少的。下面是Reactor的作法:

  1. 某个事件处理器宣称它对某个socket上的读事件很感兴趣;

  2. 事件分离者等着这个事件的发生;

  3. 当事件发生了,事件分离器被唤醒,这负责通知先前那个事件处理器;

  4. 事件处理器收到消息,因而去那个socket上读数据了. 若是须要,它再次宣称对这个socket上的读事件感兴趣,一直重复上面的步骤;

下面再来看看真正意义的异步模式Proactor是如何作的:

  1. 事件处理器直接投递发一个读操做(固然,操做系统必须支持这个异步操做)。这个时候,事件处理器根本不关心读事件,它只管发这么个请求,它魂牵梦萦的是这个读操做的完成事件。这个事件处理器很拽,发个命令就无论具体的事情了,只等着别人系统)帮他搞定的时候给他回个话。

  2. 事件分离器等着这个读事件的完成(比较下与Reactor的不一样);

  3. 当事件分离器默默等待完成事情到来的同时,操做系统已经在一边开始干活了,它从目标读取数据,放入用户提供的缓存区中,最后通知事件分离器,这个事情我搞完了;

  4. 事件分离器通知以前的事件处理器: 你吩咐的事情搞定了;

  5. 事件处理器这时会发现想要读的数据已经乖乖地放在他提供的缓存区中,想怎么处理都行了。若是有须要,事件处理器还像以前同样发起另一个读操做,和上面的几个步骤同样。

现行作法

开源C++开发框架ACEAdaptive Communication Enviromen 提供了大量平台独立的底层并发支持类(线程、互斥量等).同时在更高一层它也提供了独立的几组C++类,用于实现ReactorProactor模式。 尽管它们都是平台独立的单元,但他们都提供了不一样的接口.

ACE ProactorMS-Windows上不管是性能还在健壮性都更胜一筹,这主要是因为Windows提供了一系列高效的底层异步API

不幸的是,并非全部操做系统都为底层异步提供健壮的支持。举例来讲,许多Unix系统就有麻烦. ACE中的ProactorUnix上是使用Posix标准实现的异步操做,Posix中有一个AIOProactor使用AIO实现异步传输。但Linux2.6之前版本中不支持AIO,而在2.6版本之后,部分支持AIO。就由于这个部分支持,因此,Posix的子类不能正常工做。所以,ACE Reactor多是Unix系统上更合适的解决方案.正由于系统底层的支持力度不一,为了在各系统上有更好的性能,开发者不得不维护独立的好几份代码:Windows准备的ACE Proactor以及为Unix系列提供的ACE Reactor

就像咱们提到过的,真正的异步模式须要操做系统级别的支持。因为事件处理器及操做系统交互的差别,为ReactorProactor设计一种通用统一的外部接口是很是困难的。这也是设计通行开发框架的难点所在。

ACE Proactor 框架

怎样发送和接收数据

ACEProactor框架包含了一组高度相关的类,其数量相对较多,我在进行如下描述的时候不可能按照顺序讨论它们,而又不进行提早引用。到最后我会描述完全部这些类。下面这些类给出了ACE Proactor框架的各个类以及它们之间的关系。能够把这个图1-1看成描述ACE Proactor框架实际应用的范本。注意:类名中以ACE_开始的类名称是ACE Procator框架中包含的类,而以golden_开始的类名称是实际应用范本提供的类。

下面的代码声明了一个类,它所完成的基本工做是处理接收和发送数据。

1.1 ACE Proactor框架中的类

#include "ace/Asynch_IO.h"

class golden_aio_handler : public ACE_Service_Handler

{

public :

golden_aio_handler (golden_aio_acceptor *acc = 0) ;

virtual void open ( ACE_HANDLE new_handle,

ACE_Message_Block &message_block ) ;

virtual void handle_read_stream(

const ACE_Asynch_Read_Stream::Result &result);

virtual void handle_write_stream(

const ACE_Asynch_Write_Stream::Result &result);

private:

ACE_Asynch_Read_Stream reader_;

ACE_Asynch_Write_Stream writer_;

} ;

这段代码首先包含了一些必需的头文件,以引入这个例子使用的ACE Proactor框架类:

  1. ACE_Service_Handler Proactor框架中建立事件处理器所用的目标类 。

  2. ACE_Handler ACE_Service_Handler的父类,定义了经过ACE_Proactor框架处理异步I/O完成事件所须要的接口。

  3. ACE_Asynch_Read_Stream 用于在已经链接的TCP/IP socket上发起读操做的I/0工厂类。

  4. ACE_Asynch_Write_Stream 用于在已经链接的TCP/IP socket上发起写操做的I/0工厂类。

  5. Result 每一个I/O工厂类都把Result定义为嵌在本身内部的类,用以保存该工厂发起的每一个操做的结果。全部的Result类都从ACE_Asynch_Result派生,而且增长了专用于它们所针对的I/O类型的数据和方法。由于每一个异步I/O操做的发起和完成都是分离的、不一样的事情,须要有一种机制来“记住”操做的参数,而且连同结果一块儿吧这些参数转交给完成处理器。

设置事件处理器并发起I/O

TCP链接打开时,咱们应该把新socket的句柄传给事件处理器对象,在这个例子中是golden_aio_handler。把句柄放在事件处理器里是有益的,缘由以下:

    1. 它是socket的生命期一个方便的控制点,由于它是链接工厂的目标。

    2. I/O操做最有可能从这个类发起。

在使用ACE_Proactor框架的异步链接创建类时golden_aio_handler::open()挂钩方法会在新链接创建时被调用。下面是咱们程序中的open()挂钩:

void

golden_aio_handler::open(ACE_HANDLE new_handle, ACE_Message_Block &)

{

this->handle(new_handle);

//打开异步读写

reader_.open (*this, new_handle, 0, proactor ());

writer_.open (*this, new_handle, 0, proactor ());

//准备读的缓冲区

ACE_NEW_NORETURN(mblk_, ACE_Message_Block (SIZEOF_HEADER_WITH_CRC));

if (reader_.read (*mblk_,SIZEOF_HEADER_WITH_CRC) <0)

{

delete this ;

}

}

在一开始,咱们使用继承而获得的ACE_Handler::handle()方法保存新socket的句柄。该方法把句柄存储在一个方便的地方,以便在析构函数~golden_aio_handler()访问或者用于其余用途。这是在这个类中实现的socket句柄生命期管理的一部分。

要发起I/O,必须初始化所需的I/O工厂对象。在存储了socket句柄以后,open()方法会初始化reader_writer_ I/O工厂对象,为发起I/O操做作准备。两个类的open()方法都是同样的:

int open (ACE_Handler &handler,

ACE_HANDLE handle = ACE_INVALID_HANDLE,

const void *completion_key = 0,

ACE_Proactor *proactor = 0);

第一个参数表示工厂对象所发起的操做的完成事件处理器里。当经过工厂对象发起的I/O操做完成时,ACE_Proactor框架会回调这个对象。这也是为何该处理器对象叫作完成事件处理器的缘由。在咱们的程序中,golden_aio_handler对象是ACE_Handler的后代,便是读操做也是写操做的完成事件处理器,因此*this被用做处理器参数。handle是新传入的socket句柄,completion_key参数只适用于windows默认传入0便可,proactor参数会传入一个在进程范围的ACE_Procator单体对象。

程序中的open()挂钩方法所作的最后一件事情,是调用ACE_Asynch_Read_Stream::read()方法,从而在新的socket上发起一个读操做。ACE_Asynch_Read_Stream::read()函数以下:

int read (ACE_Message_Block &message_block,

size_t num_bytes_to_read,

const void *act = 0,

int priority = 0,

int signal_number = ACE_SIGRTMIN);

为传输指定一个ACE_Message_Block ,使得缓冲区管理变得更为容易,由于能够利用ACE_Message_Block的各类能力,以及它与ACE的其余部分的集成。在发起读操做时,数据会被读入开始于数据块的写指针处在的块中,由于要被读取的数据将被写入块中。

完成I/O操做

ACE_Proactor框架是基于事件的框架。I/O工厂登记“每一个操做”与“该操做完成时应回调的完成事件处理器”之间创建关联。当读取完成时,ACE_Proactor框架会调用ACE_Handler::handle_read_stream()挂钩方法:

void golden_aio_handler::handle_read_stream(

const ACE_Asynch_Read_Stream::Result &result)

{ ACE_Asynch_Read_Stream::Result &result

if (!result.success () || result.bytes_transferred () == 0)

delete this;

else if (result.bytes_transferred () < result.bytes_to_read ())

{

if (reader_.read (*mblk_, result.bytes_to_read () - result.bytes_transferred ()) < 0)

delete this ;

}

else if (mblk_->length () == SIZEOF_HEADER_WITH_CRC)

handle_msg_header();

else

{

if (handle_msg_pack()<0)

delete this ;

}

}

传入的ACE_Asynch_Read_Stream::Result指向的是用于保存读取操做结果的对象。每一个I/O工厂类都会定义本身的Result类,即用于保存每一个操做发起时所用的参数,又用于保存操做的结果。

若是读操做读取了任何数据,处理接收read到的报文数据包用handle_msg_pack函数,而后发起一个写操做,把数据处理结果返回给对端。当写操做完成时,ACE_Proactor框架调用下面的handle_write_stream方法:

void golden_aio_handler::handle_write_stream(const ACE_Asynch_Write_Stream::Result &result)

{

if(reader_.read (*mblk_,SIZEOF_HEADER_WITH_CRC) < 0)

delete this;

}

无论写操做是否成功完成,在该操做中使用的消息块都会释放。若是socket出了问题,先前发起的读操做也会完成并出错,而handle_read_stream()会清理对象和socket句柄。

1-2给出了本程序事件序列。

创建链接

ACE提供里两个工厂类,用于经过ACE_Proactor框架前摄式地创建TCP/IP链接:

ACE_Asynch_Acceptor , 用于发起被动的链接创建

ACE_Asynch_Connector , 用于发起主动的链接创建

1.2 ACE Proactor异步回调序列图

当使用其中一个类创建TCP/IP链接时,ACE_Proactor框架会建立一个从ACE_Service_Handler派生的事件服务处理器,好比golden_aio_handler,用以处理新链接。ACE_Service_Handler类是

ACE_Proactor框架中全部用异步方式链接的服务的基类,从ACE_Handler派生,因此服务类也能够处理在服务中发起的I/O操做的完成.

ACE_Asynch_Acceptor是一个至关容易使用的类,它的一个挂钩方法是一个protected虚方法:make_handler()Proactor框架调用这个方法获取一个ACE_Service_Handler对象,用觉得新链接提供服务。下面的代码说明了这种状况:

golden_aio_handler * golden_aio_acceptor::make_handler (void)

{

///来一个链接,就新增一个句柄。在线程池中处理

golden_aio_handler *ih;

ACE_NEW_RETURN (ih, golden_aio_handler (this), 0);

if (clients_.insert (ih) == -1)

{

delete ih ;

return NULL ;

}

return ih;

}

return 0 ;

}

ACE_Proactor完成多路分离器

ACE_Proactor类负责驱动ACE_Proactor框架的完成处理,这个类等待完成事件的发生、把这些事件多路分离给相关联的完成事件处理器,并分派每一个完成处理器上适当的挂钩方法。所以,要让异步I/O完成事件处理器得以发生----不管是I/O仍是链接创建----Golden Server中都必须运行前摄器的时间循环。这一般很简单,只须要把下面的代码插入到应用中就能够了:

int golden_aio::svc()

{

ACE_Proactor::instance()->proactor_run_event_loop ();

return 1 ;

}

能够经过两种方式来使用ACE_Proactor,如上所示的程序代码instance(),做为单体来使用。也能够经过实例化一个或多个实例来使用。这个能力被用于在一个进程中支持多个前摄器。以下代码所示,这是应用于镜像发送和镜像接收的前摄器

int golden_mirror_sender::svc()

{

proactor_sender_->proactor_run_event_loop ();

return 1 ;

}

int golden_mirror_receiver::svc()

{

proactor_recviver_->proactor_run_event_loop ();

return 1 ;

}

各类操做系统上的异步I/O设施会有很大的不一样,为了在全部这些系统上维持统一的接口和编程方法,ACE_Proactor类使用了Bridge模式来维持灵活性和可扩展性,同时还使得ACE_Proactor框架可以使用不一样的异步I/O实现。

ACE_WIN32_Proactor类是Windows上的ACE_Proactor实现。使用了I/O完成端口进行完成事件检测。在初始化异步操做工厂时,I/O句柄与前摄器的I/O完成端口被关联在一块儿。在这种实现中,windows下的GetQueuedCompletionStatus()函数负责执行事件循环,以下程序代码

int golden_server::create_proactor()

{

ACE_Proactor::instance()->close_singleton();

impl_ = new ACE_WIN32_Proactor(0,1);

ACE_Proactor::instance(new ACE_Proactor(impl_,1),1) ;

return 0;

}

int golden_server::create_proactor_mirror_recviver()

{

ACE_NEW_RETURN(impl_mirror_recviver_, ACE_WIN32_Proactor(0,1),-1);

ACE_NEW_RETURN(mirror_recviver_proactor_, ACE_Proactor(impl_mirror_recviver_,1),-1);

return 0;

}

int golden_server::create_proactor_mirror_sender()

{

ACE_NEW_RETURN(impl_mirror_sender_, ACE_WIN32_Proactor(0,1),-1);

ACE_NEW_RETURN(mirror_sender_proactor_, ACE_Proactor(impl_mirror_sender_,1),-1);

return 0;

}

线程池

大多数网络服务器都被设计成能同时处理多个客户请求。使用反应式事件处理、多个进程和多个线程。在构建多线程服务器时,咱们拥有多种选择,包括:为每一个请求派生一个新线程、为每一个链接/会话派生一个新线程、预先派生一池受管线程,也就是建立一个线程池。在Golden Server设计中咱们采用了了线程池的方法。

线程池模型有两种变种,每种都有不一样的性能特征:

  1. 半同步/半异步模型。在这种模型中,一个侦听会异步的接收请求,并在某个队列中缓冲它们。另一组工做者线程负责同步地处理这些请求。

  2. 领导者/跟随着模型。在这种模型总,有一个线程是领导者,其他线程是在线程池中的跟随者。当请求到达时,领导者会拾取它,并从跟随者中选取一个新的领导者,而后继续处理该请求。所以,在这种模型中,接收请求的线程池就是处理它的线程。

领导者/跟随者模型中,只用了一组线程等待新请求,并处理请求。一个线程被选做领导者,阻塞在“到来的请求源”上,当请求到达时,领导者线程首先获取请求,把某个跟随者提高为领导者,而后继续处理所收到的请求。新领导者在请求源上等待新的请求,与此同时旧领导者会处理刚刚收到的请求,一旦就领导者完成处理,它就会做为跟随者线程回到线程池的末尾。

领导者/跟随者模型的一个优势是性能获得了提升,由于不用进行线程间的上下文切换。但同时这种模型也是复杂的。在程序中单个ACE_Task封装了线程池中的全部线程。

class golden_aio :public ACE_Task<ACE_SYNCH>

{

public:

golden_aio(int number_of_connection)

/// ACE_TASK的虚拟方法。用来启动svc

virtual int open (void * = 0);

///初始化

virtual int init(u_short ,ACE_Proactor* );

///结束并关闭

virtual int fini();

/// Run by a daemon thread to handle deferred processing

virtual int svc (void);

};

int golden_aio::open (void * )

{

return activate (THR_NEW_LWP | THR_JOINABLE |THR_INHERIT_SCHED ,number_of_thread_);

}

int golden_aio::svc()

{

ACE_Proactor::instance()->proactor_run_event_loop ();

return 1 ;

}

每一个线程启动时,首先会经过调用activate(),将任务转化为运行在一个或多个线程中的主动对象。主动对象执行任务的svc()挂钩方法。每一个线程在执行proactor_run_event_loop ()调用GetQueuedCompletionStatus ()。 若是没有消息到达线程会阻塞在GetQueuedCompletionStatus ()函数上,直到有消息到来那么有一个线程便成了领导者线程。若是领导者线程能很快的处理完全部事情,领导者线程会再次进入等待状态。若是领导者线程不能立刻处理完,则从跟随者线程中指定一个新的领导者线程,本身去处理事件,再也不当领导者。

单体模型

Golden Servergolden_usergolden_authorizegolden_startupgolden_protgolden_system_infogolden_dir_visitorgolden_mirror_sendergolden_mirror_receiver采用单体实例模式。相关单体模式的概念,请自行参考相关手册和书籍。

如何添加一个接口应用

Goldensdk模块

  1. 打开goldensdk.h头文件,根据接口类型找到合适的位置,添加接口的声明、接口的描述注释。

  2. 打开goldensdk.cpp文件,添加接口的空实现。

  3. 打开goldensdk.def文件,添加接口名称。

  4. 按照步骤完成Goldenserver模块

  5. goldensdk.cpp文实现已经添加的空接口。

Goldenserver模块

  1. golden_message_protocol.h中添加自定义的报文数据结构定义和报文消息ID , 命名规则以下GOLDEN_PACK_XXXGOLDEN_PACK_XXX_RESULTMESSAGE_XXXMESSAGE_XXX_RESULT

  2. golden_message_protocol.cpp中添加用来计算报文数据结构大小的函数声明和整编\解编数据流的函数声明。命名规则以下:sizeof_GOLDEN_PACK_XXX() sizeof_GOLDEN_PACK_XXX_RESULT()int operator<<(ACE_OutputCDR &cdr , const GOLDEN_PACK_XXX &spack) int operator<<(ACE_OutputCDR &cdr , const GOLDEN_PACK_XXX_RESUL &spack);

  3. golden_message_protocol.cpp中添加用来计算报文数据结构大小的函数实现和整编\解编数据流的函数实现。

  4. golden_message_protocol.cpp中添加所有变量_gstring中的内容,用来描述报文内容。

  5. goldeserver.h中的golden_aio_handler类中添加处理报文的成员函数声明。命名规则以下:int msg_xxx() int msg_xxx_result() .

  6. goldeserver.cpp中添加处理报文的成员函数的空实现golden_aio_handler:: msg_xxx(), golden_aio_handler:: msg_xxx_result() .

  7. goldeserver.cppint golden_aio_handler::handle_msg_pack()函数中添加报文处理项。

  8. 实现f步骤中定义的成员函数空实现。

参考文献

  1. C++网络编程卷一--- 运用ACE和模式消除复杂性

  2. C++网络编程卷二--- 基于ACE和框架的系统化复用

  3. ACE程序员指南--- 网络与系统编程的实用设计模式

  4. 设计模式:可复用面向对象软件的基础

相关文章
相关标签/搜索