boost::asio的io_service处理过程

1.主线程定义回调对象react

2.调用io object的操做windows

3.io object会另开线程,定义opertion op来执行操做,同时将回调对象加到op的do_complete上。进行操做网络

4.完成操做加入完成队列架构

5.io_service线程循环从完成队列取事件,调用其事件对应的回调函数并发

 

 

 

 

 

Operation

还记得前面咱们在分析resolver的实现的时候,挖了一个关于operation的坑?为了避免让本身陷进去,如今来填吧;接下来咱们就来看看asio中的各类operationapp

 

和前面提到过的service的相似,这里的operation也分为两大系:IOCP EnableDisable系列。这里咱们重点关注下图中红色部分表示的IOCP Enable系列operation异步

 


  

OVERLAPPED基类

从上图能够看到,全部IOCP Enableoperation,其基类都是struct OVERLAPPED结构,该结构是Win32进行交叠IO一个很是重要的结构,用以异步执行过程当中的参数传递。全部的operation直接从该结构继承的结果,就是全部operation对象,能够直接做为OVERLAPPED结构在异步函数中进行传递。socket

 

例如在win_iocp_socket_service_base中,为了启动一个receive的异步操做, start_receive_op函数就直接把传递进来的operation指针做为OVERLAPPED结构传递给::WSARecv函数,从而发起一个异步服务请求。async

void win_iocp_socket_service_base::start_receive_op(tcp

   win_iocp_socket_service_base::base_implementation_type& impl,

   WSABUF* buffers, std::size_t buffer_count,

   socket_base::message_flags flags, bool noop,operation* op)

{

 update_cancellation_thread_id(impl);

 iocp_service_.work_started();

 

 if (noop)

   iocp_service_.on_completion(op);

 else if (!is_open(impl))

   iocp_service_.on_completion(op, boost::asio::error::bad_descriptor);

 else

 {

   DWORD bytes_transferred = 0;

   DWORD recv_flags = flags;

   int result = ::WSARecv(impl.socket_, buffers, static_cast<DWORD>(buffer_count),

       &bytes_transferred, &recv_flags, op, 0);

   DWORD last_error = ::WSAGetLastError();

   if (last_error == ERROR_NETNAME_DELETED)

     last_error = WSAECONNRESET;

   else if (last_error == ERROR_PORT_UNREACHABLE)

     last_error = WSAECONNREFUSED;

   if (result != 0 && last_error != WSA_IO_PENDING)

     iocp_service_.on_completion(op, last_error, bytes_transferred);

   else

     iocp_service_.on_pending(op);

 }

}

 

执行流程

关于operation对象的建立、传递,以及完成handler的执行序列等,使用下图能够清晰的描述。

 


  

下表反映了Windows环境下,部分的异步请求所对应的服务、win函数、operation等信息:

 

异步请求

服务

start op

Win32函数

对应operation

ip::tcp::socket::async_connect

win_iocp_socket_service

start_connect_op()

::connect

reactive_socket_connect_op

ip::tcp::socket::async_read_some

start_receive_op()

::WSARecv

win_iocp_socket_recv_op

ip::tcp::socket::async_receive

start_receive_op()

::WSARecv

win_iocp_socket_recv_op

ip::tcp::socket::async_write_some

start_send_op()

::WSASend

win_iocp_socket_send_op

ip::tcp::socket::async_send

start_send_op()

::WSASend

win_iocp_socket_send_op

ip::tcp::acceptor::async_accept

start_accept_op()

::AcceptEx

win_iocp_socket_accept_op

 

 

 

 

 

ip::tcp::resolver::async_resolve

resolver_service

start_resolve_op()

::getaddrinfo

resolve_op

 

 

静态的do_complete

不知你是否注意到,在operation的类图中,全部从operation继承的子类,都定义了一个do_complete()函数,然而该函数声明为static,这又是为什么呢?

 

咱们以win_iocp_socket_recv_op为例来进行说明。该类中的do_complete是这样声明的:

     staticvoid do_complete(io_service_impl* owner,

         operation* base,

         const boost::system::error_code& result_ec,

         std::size_t bytes_transferred)

 

该类的构造函数,又把此函数地址传递给父类win_iocp_operation去初始化父类成员,这两个类的构造函数分别以下,请注意加粗代码:

win_iocp_socket_recv_op ::

win_iocp_socket_recv_op(socket_ops::state_type state,

     socket_ops::weak_cancel_token_type cancel_token,

     const MutableBufferSequence& buffers, Handler& handler)

   :operation(&win_iocp_socket_recv_op::do_complete),

         state_(state),

         cancel_token_(cancel_token),

         buffers_(buffers),

         handler_(BOOST_ASIO_MOVE_CAST(Handler)(handler))

   {

   }

 

win_iocp_operation ::win_iocp_operation(func_type func)

         : next_(0),

         func_(func)

   {

         reset();

   }

 

至此,咱们明白,将do_complete声明为static,能够方便获取函数指针,并在父类中进行回调。那么,不只要问,既然两个类存在继承关系,那么为什么不将do_complete声明为虚函数呢?

 

再回头看看这些类的最顶层基类,就会明白的。最顶层的OVERLAPPED基类,使得将operation对象做为OVERLAPPED对象在异步函数中进行传递成为可能;若是将do_complete声明为虚函数,则多数编译器会在对象起始位置放置vptr,这样就改变了内存布局,从而不能再把operation对象直接做为OVERLAPPED对象进行传递了。

 

固然,必定要用虚函数的话,也不是不可能,只是在传递对象的时候,就须要考虑到vptr的存在,这会有两个方面的问题:一是进行多态类型转换时,效率上的损失;二是各家编译器对vtpr的实现各不相同,跨平台的asio就须要进行多种适配,这无疑又过于烦躁了。因而做者就采起了最为简单有效的方式——用static函数来进行回调——简单,就美。

 

win_iocp_io_service的实现

Windows NT环境下(IOCP Enabled),win_iocp_io_service表明着io_service,是整个asio的运转核心。本节开始来分析该类的实现。

 

从类的命名也能够看出,IOCP是该实现的核心。IOCPIO Completion Port IOCP)在windows上,能够说是效率最高的异步IO模型了,他使用有限的线程,处理尽量多的并发IO请求。该模型虽然说能够应用于各类IO处理,但目前应用较多的仍是网络IO方面。

 

咱们都知道,在Window是环境下使用IOCP,基本上须要这样几个步骤:

  1. 使用Win函数CreateIoCompletionPort()建立一个完成端口对象;
  2. 建立一个IO对象,如用于listensocket对象;
  3. 再次调用CreateIoCompletionPort()函数,分别在参数中指定第二步建立的IO对象和第一步建立的完成端口对象。因为指定了这两个参数,这一次的调用,只是告诉系统,后续该IO对象上全部的完成事件,都投递到绑定的完成端口上。
  4. 建立一个线程或者线程池,用以服务完成端口事件;

全部这些线程调用GetQueuedCompletionStatus()函数等待一个完成端口事件的到来;

  1. 进行异步调用,例如WSASend()等操做。
  2. 在系统执行完异步操做并把事件投递到端口上,或者客户本身调用了PostQueuedCompletionStatus()函数,使得在完成端口上等待的一个线程苏醒,执行后续的服务操做。

 

那么,这些步骤,是如何分散到asio中的呢?来吧,先从完成端口建立开始。

完成端口的建立

如上所述,完成端口的建立,须要调用CreateIoCompletionPort()函数,在win_iocp_io_service的构造函数中,就有这样的操做:

win_iocp_io_service::win_iocp_io_service(

   boost::asio::io_service& io_service, size_tconcurrency_hint)

 : boost::asio::detail::service_base<win_iocp_io_service>(io_service),

   iocp_(),

   outstanding_work_(0),

   stopped_(0),

   stop_event_posted_(0),

   shutdown_(0),

   dispatch_required_(0)

{

 BOOST_ASIO_HANDLER_TRACKING_INIT;

 

 iocp_.handle = ::CreateIoCompletionPort(INVALID_HANDLE_VALUE, 0, 0,

     static_cast<DWORD>((std::min<size_t>)(concurrency_hint, DWORD(~0))));

 if (!iocp_.handle)

 {

   DWORD last_error = ::GetLastError();

   boost::system::error_code ec(last_error,

       boost::asio::error::get_system_category());

   boost::asio::detail::throw_error(ec, "iocp");

 }

}

 

win_iocp_io_service的构造函数,负责建立一个完成端口,并把此完成端口对象的句柄交给一个auto_handle进行管理——auto_handle的惟一用途,就是在对象析构时,调用::CloseHandle()windows句柄资源关闭,从而保证不会资源泄露。

 

咱们在windows环境下,声明一个boost::asio::io_service对象,其内部就建立了一个win_iocp_io_service的实例;所以,一个io_service对象就对应着一个完成端口对象——这也就能够解释,为何全部的IO Object都须要一个io_service参数了——这样,你们就好公用外面定义好的完成端口对象。

 

除了io_service对象会建立一个完成端口对象,事实上,在asio中,另一个service也会建立一个,这就是boost::asio::ip::resolver_service。该类对应的detail实现boost::asio::detail::resolver_service中,有一个数据成员是: io_service,这样就一样建立了一个完成端口对象:

    namespace boost {

namespace asio {

namespace detail {

 

class resolver_service_base

{

...

protected:

// Private io_service used for performing asynchronous host resolution.

            scoped_ptr<boost::asio::io_service> work_io_service_;

...

 

至于该完成端口的用途如何,咱们在后续部分再来讲明——搽,又开始挖坑了。

完成端口的绑定

在建立了io对象后,例如socket,就须要将此对象和完成端口对象绑定起来,以指示操做系统将该io对象上后续全部的完成事件发送到某个完成端口上,该操做一样是由CreateIoCompletionPort()函数完成,只是所使用的参数不一样。

 

win_iocp_io_service中,这个操做由下面的代码完成——请注意参数的差异:

boost::system::error_code win_iocp_io_service::register_handle(

   HANDLE handle, boost::system::error_code& ec)

{

 if (::CreateIoCompletionPort(handle, iocp_.handle, 0, 0)== 0)

 {

   DWORD last_error = ::GetLastError();

   ec = boost::system::error_code(last_error,

       boost::asio::error::get_system_category());

 }

 else

 {

   ec = boost::system::error_code();

 }

 return ec;

}

 

经过代码搜索,咱们发现函数win_iocp_socket_service_base::do_open()内部调用了register_handle();该函数的做用是打开一个socket(其中调用了socket函数socket()去建立一个socket),也就是说,在打开一个socket后,就把该socket绑定到指定的完成端口上,这样,后续的事件就会发送到完成端口了。

 

此外还有另外的和assign相关的两个函数也调用了register_handle(),再也不贴出其代码了。

 

线程函数

IOCP要求至少有一个线程进行服务,也能够有一堆线程;io_service早就为这些线程准备好了服务例程,即io_service::run()函数。

  • 若是应用只打算使用一个线程进行服务,那么在主线程中准备好了异步请求后,调用io_service::run()便可。注意,必须先发起一个异步请求,而后才能调用run()。参考一下run()的实现就会明白。
  • 若是打算用多个线程进行服务,能够建立多个线程,指定io_service::run()做为线程函数便可。一个最简单的示例是:

void server::run()

{

 // Create a pool of threads to run all of the io_services.

 std::vector<boost::shared_ptr<boost::thread> > threads;

 for (std::size_t i = 0; i < thread_pool_size_; ++i)

 {

boost::shared_ptr<boost::thread>

   thread(

       new boost::thread(

               boost::bind(&boost::asio::io_service::run, &io_service_)

           )

       );

   threads.push_back(thread);

 }

 

 // Wait for all threads in the pool to exit.

 for (std::size_t i = 0; i < threads.size(); ++i)

   threads[i]->join();

}

 

因为io_service::run()又是委托win_iocp_io_service::run()来实现的,咱们来看看后者的实现:

size_t win_iocp_io_service::run(boost::system::error_code& ec)

{

 if (::InterlockedExchangeAdd(&outstanding_work_, 0) == 0)

 {

   stop();

   ec = boost::system::error_code();

   return 0;

 }

 

 win_iocp_thread_info this_thread;

 thread_call_stack::context ctx(this, this_thread);

 

 size_t n = 0;

  while (do_one(true, ec))

   if (n != (std::numeric_limits<size_t>::max)())

     ++n;

 return n;

}

 

run()首先检查是否有须要处理的操做,若是没有,函数退出;win_iocp_io_service使用outstanding_work_来记录当前须要处理的任务数。若是该数值不为0,则委托do_one函数继续处理——asio中,全部的脏活累活都在这里处理了。

 

win_iocp_io_service::do_one函数较长,咱们只贴出核心代码

size_t win_iocp_io_service::do_one(bool block, boost::system::error_code& ec)

{

 for (;;)

 {

   // Try to acquire responsibility for dispatching timers and completed ops.

   if (::InterlockedCompareExchange(&dispatch_required_, 0, 1) == 1)?#1

   {

     mutex::scoped_lock lock(dispatch_mutex_);

 

     // Dispatch pending timers and operations.

     op_queue<win_iocp_operation> ops;

     ops.push(completed_ops_);

     timer_queues_.get_ready_timers(ops);

     post_deferred_completions(ops);?#2

     update_timeout();

   }

 

   // Get the next operation from the queue.

   DWORD bytes_transferred = 0;

   dword_ptr_t completion_key = 0;

   LPOVERLAPPED overlapped = 0;

::SetLastError(0);

   BOOL ok = ::GetQueuedCompletionStatus(iocp_.handle, &bytes_transferred,

       &completion_key, &overlapped, block ? gqcs_timeout : 0);?#3

   DWORD last_error = ::GetLastError();

 

   if (overlapped)

   {

     win_iocp_operation* op =static_cast<win_iocp_operation*>(overlapped);?#4

     boost::system::error_code result_ec(last_error,

         boost::asio::error::get_system_category());

 

     // We may have been passed the last_error and bytes_transferred in the

     // OVERLAPPED structure itself.

     if (completion_key == overlapped_contains_result)

     {

       result_ec = boost::system::error_code(static_cast<int>(op->Offset),

           *reinterpret_cast<boost::system::error_category*>(op->Internal));

       bytes_transferred = op->OffsetHigh;

     }

 

     // Otherwise ensure any result has been saved into the OVERLAPPED

     // structure.

     else

     {

       op->Internal = reinterpret_cast<ulong_ptr_t>(&result_ec.category());

       op->Offset = result_ec.value();

       op->OffsetHigh = bytes_transferred;

     }

 

     // Dispatch the operation only if ready. The operation may not be ready

     // if the initiating function (e.g. a call to WSARecv) has not yet

     // returned. This is because the initiating function still wants access

     // to the operation's OVERLAPPED structure.

     if (::InterlockedCompareExchange(&op->ready_, 1, 0) == 1)

     {

       // Ensure the count of outstanding work is decremented on block exit.

       work_finished_on_block_exit on_exit = { this };?#5

       (void)on_exit;?#6

 

       op->complete(*this, result_ec, bytes_transferred);?#7

       ec = boost::system::error_code();

       return 1;

     }

   }

   else if (!ok)

{

...

 

作一下简要说明:

-  #1变量dispatch_required_记录了因为资源忙,而没有成功投递的操做数;全部这些操做都记录在队列completed_ops_中;

-  #2将全部须要投递的操做,投递出去;至于什么样的操做须要投递,什么时候投递,以及为先前会投递失败,失败后如何处理等,咱们后续说明——再次挖坑了。

-  #3IOCP的核心操做函数GetQueuedCompletionStatus()出现了。该函数致使线程在完成端口上进行等待,直到超时或者某个完成端口数据包到来。

-  #4注意这里将 OVERLAPPED结构直接转换为 operation对象。相关内容在前面的operation:OVERLAPPED基类部分已经有说明。

-  #5该变量保证在操做完成,return以后,win_iocp_io_service对象所记录的任务数outstanding_work_会自动减1——是啊,辛辛苦苦作的事儿,能不记录下来嘛!

-  #6这一行从功能上讲没有什么特别的用途;不过有了这一行,能够抑制有些编译器针对 #5所声明的变量没有被使用的编译器警告;

-  #7调用operation对象的complete()函数,从而调用到异步操做所设定的回调函数。具体流程参考operation:执行流程

 

任务投递

上述的线程函数,会在GetQueuedCompletionStatus()函数上进行等待,直到超时或者有完成端口数据包到来;

 

完成端口数据包,有两个来源:一个是用户所请求的异步操做完成,异步服务执行者(这里是操做系统)向该完成端口投递完成端口数据包;另一种状况是,用户本身使用IOCP的另一个核心函数PostQueuedCompletionStatus()向完成端口投递数据包;

 

通常的异步操做请求,是不须要用户本身主动向完成端口投递数据的,例如async_read, asyn_write等操做;

 

有另一些操做,因为没有对应或者做者并无采用支持OVERLAPPED IO操做的Win32函数,就须要实现者本身管理完成事件,并进行完成端口数据包的投递,好比:

  • async_resolve:因为系统没有提供对应的OVERLAPPED IO操做,须要实现者本身管理,因此其本身进行投递
  • async_connect:因为做者并无采用支持OVERLAPPED IOConnectEx()版本的链接函数,而是采用了标准的socket函数connect()进行链接,因此也须要本身进行投递

 

另外还有一些io_service提供的操做,例如请求io_service执行代为执行指定handler的操做:

  • dispatch(handler)
  • post(handler)

 

全部这些须要本身投递完成端口数据包的操做,基本上都是这样一个投递流程:

  • 调用win_iocp_io_service::post_immediate_completion(op)
    • 调用work_started()outstanding_work_ 1
    • 调用post_deferred_completion(op)
      • 因为自行管理,主动将op->ready_置为 1,代表op就绪
      • 调用PostQueuedCompletionStatus(op)进行投递
      • 若是投递失败,则把该op放置到等待dispatch的队列completed_ops_ 中,待下一次do_one()执行时,再次投递

 

OK,至此,基本分析完了operation的投递,总数填了一个前面挖下的坑。

Resolver本身的IOCP

前面说过,Resolver本身会建立一个IOCP,为何会这样呢?因为Win32下面没有提供对应于地址解析的overlapped版本的函数,为了实现async_resolve操做,做者本身实现了这样一个异步服务。在resolver_service内部,有一个io_service数据成员,该数据成员建立了一个IOCP;除此以外,该service内部还启动一个工做线程来执行io_service::run(),使用此线程来模拟异步服务。

 

使用resolver进行async_resolve的详细过程以下:

Main Thread (IOCP#1)

 

Resolver Thread (IOCP #2)

 

 

 

1.构建io_service对象, IOCP#1被建立

 

 

 

 

 

2.构建 resolver对象, IOCP#2被建立,同时该resolver持有io_service的引用

 

 

 

 

 

3.发起异步调用:resolver.async_resolve()

 

 

 

 

 

4. resolve_op被建立

 

 

 

 

 

5. Resolver线程启动,主线程开始等待

 

 

 

 

 

 

 

6.开始运行,激活等待事件,并在 IOCP#2上开始等待

 

 

 

7.线程恢复执行;将op投递到 IOCP#2

 

 

 

 

 

 

 

8.执行op->do_complete()操做,地址解析完成后,将op再回投给IOCP#1

 

 

 

9. do_one()获得Resolver线程投递回来的op开始执行op->do_complete()操做,此时回调async_resolve所设置的handler

 

 

 

 

 

10.结束

 

 

 

请注意step8 step9执行一样一个op->do_complete()函数,为何操做不同呢?看其实现就知道,该函数内部,会判断执行此函数时的owner,若是owner是主io_service对象,则说明是在主线程中执行,此时进行handler的回调;不然就说明在工做线程中,就去执行真正的地址解析工做;

任务的取消

针对socket上提交的异步请求,可使用cancel()函数来取消掉该socket上全部没执行的异步请求。

 

使用该函数,在Windows Vista(不含)之前的版本,有几点须要注意:

  • 须要定义BOOST_ASIO_ENABLE_CANCELIO来启用该功能
  • cancel()函数在内部调用Win32函数 CancelIo()
  • 该函数只能取消来自当前线程的异步请求
  • 对于正在执行的异步操做,则要看异步服务提供者是如何实现的了,可能会被取消,也可能不会;

针对这些问题,另外的替代方案是:

  • Window是上定义BOOST_ASIO_DISABLE_IOCP来禁用IOCP,使用老式的reactor模式(及select IO)。
  • 或者使用close()来关闭socket,如此一来全部未被执行的请求则都会被取消掉。

 

windows vista及后续版本中,cancel()函数在内部调用Win32函数 CancelIoEx(),该函数能够取消来自任何线程的异步操做请求,不存在上述问题。

 

须要注意的是,即便异步请求被取消了,所指定的handler也会被执行,只是传递的error code 为:boost::asio::error::operation_aborted

win_iocp_socket_service实现

service提供了windows下全部socket相关的功能,是asiowindows环境中一个很是重要的角色,他所提供的函数主要分下面两类:

  • XXXXX(), async_XXXXX()对某个操做的同步、异步函数接口;主要被上层服务调用;例如connect(), async_connect()等;
  • start_XXXXX_op() :windows发出对应的异步操做请求,例如WSARecv

 

不过关于该类的实现前面已经作了较多的涉及,再也不单独详述了。

前摄器模式

如今咱们已经把Windows环境下所涉及到的关键部件都涉及到了,此刻咱们再回过头来,从高层俯瞰一下asio的架构,看看是否会有不同的感觉呢?事实上,asio的文档用下面的图来讲明asio的高层架构——前摄器模式,咱们也从这个图开始:

boost.asio 学习笔记05——asio的windows实现 - 地线 - 别再让虚假消息充斥这本已混乱的世界

 

 

呵呵,其实这张图,从一开始就是为了表达Proactor(前摄器)模式的,基本上它和asio没半毛钱关系,只不过asio既支持同步IO,又支持异步IO,在异步IO部分,是参照Proactor模式来实现的。下面咱们来分别说说asio的前摄器模式中的各个组件:

  • Initiator,(初始化器?)中文名还真不清楚,不过其实就是客户代码,甚至能够简单理解到main函数,全部的是是非非,都是从这儿开始的。
  • Asynchronous Operation,定义的一系列异步操做,对应到Windows平台,诸如AcceptExWSASendWSARecv等函数。在asio中,这些函数封装在win_iocp_socket_service resolver_service类中。
  • Asynchronous Operation Processor,异步操做处理器,他负责执行异步操做,并在操做完成后,把完成事件投放到完成事件队列上。

仅仅从asio使用者的角度看,高层的stream_socket_service类就是一个这样的处理器,由于从tcp::socket发送的异步操做都是由其完成处理的。可是从真正实现的角度看,这样的异步操做在Windwos上,大部分由操做系统负责完成的,另一部分由asio本身负责处理,如resolver_service,所以Windows操做系统和asio一块儿组成了异步操做处理器。

  • Completion Handler,完成事件处理器;这是由用户本身定义的一个函数(函数对象),在异步操做完成后,由前摄器负责把该函数调用起来。

Windows平台上,io_service类经过win_iocp_io_service类的do_one()函数把每一个异步操做所设定的completion handler调用起来。

  • Completion Event Queue完成事件队列,存储由异步操做处理器发送过来的完成事件,当异步事件多路分离器将其中一个事件取走以后,该事件从队列中删除;

Windows上,asio的完成事件队列由操做系统负责管理;只不过该队列中的数据有两个来源,一个是Windows内部,另一个是asio中本身PostQueuedCompletionStatus()所提交的事件。

  • Asynchronous Event Demultiplexer,异步事件多路分离器,他的做用就是在完成事件队列上等待,一旦有事件到来,他就把该事件返回给调用者。

Windows上,这一功能也是由操做系统完成的,具体来讲,我认为是由GetQueuedCompletionStatus完成的,而该函数时由do_one()调用的,所以,从高层的角度来看,这个分离器,也是由io_service负责的。

  • Proactor,前摄器,负责调度异步事件多路分离器去干活,并在异步操做完成时,调度所对应的Completion Handler。在asio中,这部分由io_service来作,具体Windows就是win_iocp_io_service

 

基于上述信息,咱们重绘practor模式架构图以下:

 

 

  

相关文章
相关标签/搜索