boost.ASIO-多是下一代C++标准的网络库

曾几什么时候,Boost中有一个Socket库,但后来没有了下文,C++社区一直在翘首盼望一个标准网络库的出现,网络上开源的网络库也有很多,例如Apache Portable Runtime就是比较著名的一个,也有像ACE这样重量级的网络框架。
去年,Boost将ASIO归入了本身的体系,因为Boost的影响力,ASIO有机会成为标准网络库。做者Chris Kohlhoff以ASIO为样本向C++标准委员会提交了一个网络库建议书,里面提到:
ASIO的覆盖范围:
 Networking using TCP and UDP, including support for multicast.
 Client and server applications.
 Scalability to handle many concurrent connections.
 Protocol independence between IPv4 and IPv6.
 Name resolution (i.e. DNS).
 Timers.
不在ASIO考虑范围以内的:
 Protocol implementations such as HTTP, SMTP or FTP.
 Encryption (e.g. SSL, TLS).
 Operating system specific demultiplexing APIs.
 Support for realtime environments.
 QoS-enabled sockets.
 Other TCP/IP protocols such as ICMP.
 Functions and classes for enumerating network interfaces.
Boost.Asio支持如下平台:
 Win32 using Visual C++ 7.1 and Visual C++ 8.0.
 Win32 using Borland C++Builder 6 patch 4.
 Win32 using MinGW.
 Win32 using Cygwin.
 Linux (2.4 or 2.6 kernels) using g++ 3.3 or later.
 Solaris using g++ 3.3 or later.
 Mac OS X 10.4 using g++ 3.3 or later.
 QNX Neutrino 6.3 using g++ 3.3 or later.
 FreeBSD using g++ 3.3 or later.
参考ACE的Proactor模式,ASIO采用异步通信机制,同时参考了Symbian C++ sockets API、Microsoft .NET socket classes和Open Group的Extended Sockets API。

 

usidc5 2011-01-18 23:01

Asio 是一个跨平台的C++开发包用来处理网络和低级I/O编程,经过先进的C++方法为开发人员提供连续异步模型。
示例代码:
  void handle_read(const asio::error_code& error,
      size_t bytes_transferred)
  {
    if (!error)
    {
      asio::async_write(socket_,
          asio::buffer(data_, bytes_transferred),
          make_custom_alloc_handler(allocator_,
            boost::bind(&session::handle_write,
              shared_from_this(),
              asio::placeholders::error)));
    }
  }

  void handle_write(const asio::error_code& error)
  {
    if (!error)
    {
      socket_.async_read_some(asio::buffer(data_),
          make_custom_alloc_handler(allocator_,
            boost::bind(&session::handle_read,
              shared_from_this(),
              asio::placeholders::error,
              asio::placeholders::bytes_transferred)));
    }
  }

 

usidc5 2011-06-25 15:36
boost真是个好东西,每次去逛总会有惊喜。此次的惊喜是发现了asio,一个跨平台的支持异步I/O的网络通信socket库。

异步I/O是一种高效的I/O模型,在Windows平台下这种机制的表明就是IOCP完成端口模型。事实上,asio在Windows平台上的实现就是对IOCP的封装。

其实在网络通信这一块,已经有许多成熟的框架了,最典型的就是ACE,一个网络通信设计模式的集大成者。但ACE对我来讲过重型了,并且其起源于90年代,与标准库的集成不是太好,好比ACE就有本身的容器类。。。总而言之,ACE是一个庞然大物,威力无穷,但也显得比较笨重。

C++发展到如今,库的设计风格也愈来愈趋向于泛型,boost就是一个典型,并且boost社区跟C++标准委员会的密切关系,使得进入boost的程序库有更大的机会加入下一代的C++标准库。

所以无论从设计风格(我不否定我也喜欢追时髦;)),仍是从功利的角度看,学习asio都是一笔不错的投资。

学习她,首先要安装她。asio要求首先安装boost,下面我把本身的安装过程描述一遍,这确实仍是颇费心思的。

首先要先安装boost,这但是一个漫长而又炎热的夏天。。。万幸的是我之前已经装过了,嘿嘿。我装的是boost_1_33_0,为了完整说明,我这里也简单列了下boost的安装步骤,这也是从网上找来的。

step1.从www.boost.org下载boost库

step2 在 tools\build\jam_src目录下 运行build.bat来生成jam

step3 设置环境变量(后面的%PATH%要加)

PATH=%boost的绝对路径%\tools\build\jam_src\bin.ntx86;%PATH% 
PATH=%boost的绝对路径%;%PATH%

For Visial Studio 6.0 
SET MSVC_ROOT="VC6的安装路径" 
SET VISUALC="VC6的安装路径" 
Example: 
SET MSVC_ROOT="c:\Program Files\Microsoft Visual Studio\VC98"

For Visual Studio.net 
SET VC7_ROOT="vs.NET安装路径" 
Example: 
SET VC7_ROOT="C:\Program Files\Microsoft Visual Studio .NET\VC7"

For Visual Studio.net 2003 
SET VC71_ROOT="vs.NET2003安装路径" 
Example: 
set VC71_ROOT="C:\Program Files\Microsoft Visual Studio .NET 2003\Vc7"

step 4 编译boost库 
bjam "-sTOOLS=%编译器%" install
Visual Studio 6.0 %编译器%=msvc 
Visual Studio .NET %编译器%=vc7 
Visual Studio .NET 2003 %编译器%=vc-7_1

我用的是VC7.1,照着这个指示,当时编译了很久才完成。不过我在最后一步时,忘了加上install。这也没什么,你能够在boost下新建一个lib文件夹,而后把bin目录下全部的库文件都拷贝进来,而后在你的编译器里进行适当的路径设置(头文件路径,库文件路径),就能够使用boost了。

安装好boost后(我装在了E:\boost_1_33_0),就能够安装asio了,先去http://asio.sourceforge.net下载,如今是最新版本0.3.8。
注意下载带有boost前缀(boost_asio_0_3_8rc2.zip)的zip文件,解开后能够看到两个目录:boost和libs。把boost里面的全部文件拷贝到E:\boost_1_33_0\boost下面,注意里面有个detail目录不能直接覆盖,而是要把其中的文件(identifier.hpp)拷贝到E:\boost_1_33_0\boost\detail中去;一样把libs里面的文件夹都拷贝到E:\boost_1_33_0\libs下,就能够了。

好了,接下来就到了激动人心的时刻,让咱们来开始编译asio示例,我找了个asio自带的最简单的例子,关于同步定时器的,5秒后超时打印Hello, world!

#include <iostream>
#include <boost/asio.hpp>
#include <boost/date_time/posix_time/posix_time.hpp>

int main()
{
       boost::asio::io_service io;

       boost::asio::deadline_timer t(io, boost::posix_time::seconds(5));
       t.wait();

       std::cout << "Hello, world!\n";

       return 0;
}

先别管具体的代码,开始编译。。。果真不出所料,哪有那么顺畅的事情;)

正在编译...
main.cpp
e:\boost_1_33_0\boost\asio\detail\push_options.hpp(103) : fatal error C1189: #error :       Multithreaded RTL must be selected.

哦,原来须要多线程库,这好办,在项目属性里:配置属性 -> C/C++ -> 代码生成 -> 运行时库 ->多线程调试(/MTd),再试一下。

正在编译...
main.cpp
Please define _WIN32_WINNT or _WIN32_WINDOWS appropriately
Assuming _WIN32_WINNT=0x0500 (i.e. Windows 2000 target)
正在连接...
LINK : fatal error LNK1104: 没法打开文件“libboost_system-vc71-mt-sgd-1_33.lib”

好家伙,一下出来俩。第一个是windows版本支持问题,我在项目属性里把宏_WIN32_WINNT加到预处理器中,如今编译经过了,但连接仍是不行:

正在连接...
LINK : fatal error LNK1104: 没法打开文件“libboost_system-vc71-mt-sgd-1_33.lib”

找不到system库。我纳闷了一会,由于asio主页上明明写着大部分功能只须要boost头文件便可。所以我又照着asio主页上的说明,把_BOOST_ALL_NO_LIB也加到预处理器中去,但仍是不行。

后来我又到下载的文件中去找,发现system是asio自带的一个库,要想使用asio,就必须先编译这个库,OMG~

我还没单独编译过boost的一个库,所以又去网上找了找,终于找到了,原来也不是很难。基本步骤仍是跟编译整个boost同样,只不过在最后一步时,要换成这样:

bjam "-sTOOLS=%编译器%" --with-system install

就能够编译system库了。最后检查下编译器的头文件和库文件路径是否正确,再从新试一遍,终于大功告成!

我怀着欣喜的心情开始测试asio自带的tutorial程序,前面几个关于定时器的运行的很正常,但到了后来测试daytime1的时候,连接又有问题了。

正在编译...
main.cpp
f:\My-SmartWin-Demo\asio_demo\main.cpp(56) : warning C4267: “参数” : 从“size_t”转换到“std::streamsize”,可能丢失数据
正在连接...
main.obj : error LNK2019: 没法解析的外部符号 "public: static class boost::system::error_category __cdecl boost::system::error_code::new_category(int (__cdecl*)(class boost::system::error_code const &),class std::basic_string<char,struct std::char_traits<char>,class std::allocator<char> > (__cdecl*)(class boost::system::error_code const &),class std::basic_string<unsigned short,struct std::char_traits<unsigned short>,class std::allocator<unsigned short> > (__cdecl*)(class boost::system::error_code const &))" (?new_category@error_code@system@boost@@SA?AVerror_category@23@P6AHABV123@@ZP6A?AV?$basic_string@DU?$char_traits@D@std@@V?$allocator@D@2@@std@@0@ZP6A?AV?$basic_string@GU?$char_traits@G@std@@V?$allocator@G@2@@6@0@Z@Z) ,该符号在函数 _$E47 中被引用
Debug/asio_demo.exe : fatal error LNK1120: 1 个没法解析的外部命令

前面那个警告无论,后面连接时说一个static函数new_category(...)只有声明没有实现,我找了下,这个函数在system库里error_code.hpp中有声明,error_code.cpp也有实现,并且明明system库我也编译成功,并加入相关路径中,怎么仍是会出错?

郁闷了半天,后来干脆把error_code.cpp加入到daytime1工程中一块儿编译,终于完全搞定了。

真是TMD不容易啊。

 

usidc5 2011-06-25 15:39

对于一个网络程序的服务器端咱们须要提供的是服务器的address,和服务开放的端口号port。
在asio库中首先咱们必须使用一个io_service类来支持全部的IO功能。须要注意到是咱们必须调用io_service_my.run()函数来开启IO服务的事件循环以使功能都能被正常使用。
boost::asio::io_service io_service_my;
如今咱们能够基于这个io_service_my来关联构建一下几个类:
1. boost::asio::ip::tcp::acceptor acceptor_my(io_service_my); 
由于LPD的实现是基于TCP传输协议,因此也使用了TCP的acceptor来接收client发来的链接。
2.  boost::asio::ip::tcp::resolver resolver_my(io_service_my);
boost::asio::ip::tcp::resolver::query query_my(address,port);
boost::asio::ip::tcp::endpoint endpoint_my = *resolver.resolve(query_my);
这几个类主要是用来实现对地址的解析和绑定终端节点到相应的开放端口号上。首先构造一个关联到io_service_my的解析器resolver_my。而后让解析器resolver_my执行resolve
()函数来解析query_my指定的address和port到一个终端节点endpoint_my上。咱们会看到这个endpoint_my终端节点会被绑定到这个acceptor_my接收器上。
3. boost::asio::ip::tcp::socket socket_my(io_service_my);
定义一个基于TCP协议的socket关联到io_service_my对象上。
在这些准备工做作完后咱们开始一些实际的动做:
/*
* 打开一个使用由endpoint_my指定的协议类型的接收器,这个protocol()函数会自动返回与endpoint_my关联的协
* 议类型。
*/
acceptor_my.open(endpoint_my.protocol());

/*
* 设置选项容许socket绑定到一个已经正在被使用的地址。
*/
acceptor_my.set_option(boost::asio::ip::tcp::acceptor::reuse_address(true);

/*
* 把接收器绑定到已经被设置过的endpoint_my。
*/
acceptor_my.bind(endpoint_my);

/*
* 接收器开始侦听。
*/
acceptor_my.listen();

/*
* 以同步/异步方式开始接收链接。
*/
acceptor_my.accept(socket_my) //同步
acceptor_my.async_accept(socket_my,
  boost::bind(&handle_accept,boost::asio::placeholders::error));//异步
其中异步的侦听函数原型是:
template<
    typename SocketService,
    typename AcceptHandler>
void async_accept(
    basic_socket< protocol_type, SocketService > & peer,
    AcceptHandler handler);
handler所对应的函数在新链接接收完成后会被调用。这种异步方式实现回调的方法也相似于使用boost::asio::io_service::post(boost::bind(&handle_accept));
注意到bind函数中的&handle_accept,这是函数handle_accept的入口地址,也就是在接收完成后会调用的函数在这里咱们能够继续进行下一步的处理,从socket_my中读取或者写
入数据。

/*
* 调用异步读函数,把接收的数据拷贝到buffer缓存中,其中buffer是由参数buffer_my构造,
* 而buffer_my自己能够是一个容器例如boost::array<char,8192> buffer_my,表示申请了
* 一个8K个char字符型空间。也能够使用例外一种方法实现buffer的构造例如
* boost::asio::buffer(data,size_t);其中data表示指向某种数据类型的指针,
* size_t则表示包含多少个该数据类型的元素。
*/
void handle_accept(boost::system::error_code error)
{
if(!error)
{
  boost::array<char, 8192> buffer_my;
  boost::asio::async_read(socket_my, boost::asio::buffer(buffer_my),
   boost::bind(&handle_read, boost::asio::placeholders::error));

}
相似的写程序以下
boost::asio::async_write(socket_my, boost::asio::buffer(buffer_my),
   boost::bind(&handle_write, 
     boost::asio::placeholders::error,
     boost::asio::placeholders::bytes_transferred));

最后在全部链接完成以后或是服务器中止的时候别忘记关掉链接。例如
socket_my.close();
acceptor_my.close();
至此一个基于boost::asio库的网络程序的框架就出来了,至于具体的设计类实现能够视需求而定。

 

usidc5 2011-06-25 15:40
摘要:本文经过形像而活泼的语言简单地介绍了Boost::asio库的使用,做为asio的一个入门介绍是很是合适的,能够给人一种新鲜的感受,同时也能让体验到asio的主要内容。本文来自网络,原文在这里。


目录 [隐藏]
ASIO的同步方式
自我介绍
示例代码
小结
ASIO的异步方式
自我介绍
示例代码
小结
ASIO的“便民措施”
端点
超时
统一读写接口
基于流的操做
用ASIO编写UDP通讯程序
用ASIO读写串行口
演示代码
Boost.Asio是一个跨平台的网络及底层IO的C++编程库,它使用现代C++手法实现了统一的异步调用模型。


ASIO的同步方式
ASIO库可以使用TCP、UDP、ICMP、串口来发送/接收数据,下面先介绍TCP协议的读写操做。对于读写方式,ASIO支持同步和异步两种方式,首先登场的是同步方式,下面请同步方式自我介绍一下。


自我介绍
你们好!我是同步方式!


个人主要特色就是执着!全部的操做都要完成或出错才会返回,不过偶的执着被你们称之为阻塞,实在是郁闷~~(场下一片嘘声),其实这样 也是有好处的,好比逻辑清晰,编程比较容易。


在服务器端,我会作个socket交给acceptor对象,让它一直等客户端连进来,连上之后再经过这个socket与客户端通讯, 而全部的通讯都是以阻塞方式进行的,读完或写完才会返回。


在客户端也同样,这时我会拿着socket去链接服务器,固然也是连上或出错了才返回,最后也是以阻塞的方式和服务器通讯。


有人认为同步方式没有异步方式高效,其实这是片面的理解。在单线程的状况下可能确实如此,我不能利用耗时的网络操做这段时间作别的事 情,不是好的统筹方法。不过这个问题能够经过多线程来避免,好比在服务器端让其中一个线程负责等待客户端链接,链接进来后把socket交给另外的线程去 和客户端通讯,这样与一个客户端通讯的同时也能接受其它客户端的链接,主线程也彻底被解放了出来。


个人介绍就有这里,谢谢你们!


示例代码
好,感谢同步方式的自我介绍,如今放出同步方式的演示代码(起立鼓掌!)。


服务器端


#include <iostream>
#include <boost/asio.hpp>


int main(int argc, char* argv[])
{
        using namespace boost::asio;
        // 全部asio类都须要io_service对象
        io_service iosev;
        ip::tcp::acceptor acceptor(iosev, 
        ip::tcp::endpoint(ip::tcp::v4(), 1000));
        for(;;)
        {
                // socket对象
                ip::tcp::socket socket(iosev);
                // 等待直到客户端链接进来
                acceptor.accept(socket);
                // 显示链接进来的客户端
                std::cout << socket.remote_endpoint().address() << std::endl;
                // 向客户端发送hello world!
                boost::system::error_code ec;
                socket.write_some(buffer("hello world!"), ec);


                // 若是出错,打印出错信息
                if(ec)
                {
                        std::cout << 
                                boost::system::system_error(ec).what() << std::endl;
                        break;
                }
                // 与当前客户交互完成后循环继续等待下一客户链接
        }
        return 0;
}
客户端


#include <iostream>
#include <boost/asio.hpp>


int main(int argc, char* argv[])
{
        using namespace boost::asio;


        // 全部asio类都须要io_service对象
        io_service iosev;
        // socket对象
        ip::tcp::socket socket(iosev);
        // 链接端点,这里使用了本机链接,能够修改IP地址测试远程链接
        ip::tcp::endpoint ep(ip::address_v4::from_string("127.0.0.1"), 1000);
        // 链接服务器
        boost::system::error_code ec;
        socket.connect(ep,ec);
        // 若是出错,打印出错信息
        if(ec)
        {
                std::cout << boost::system::system_error(ec).what() << std::endl;
                return -1;
        }
        // 接收数据
        char buf[100];
        size_t len=socket.read_some(buffer(buf), ec);
        std::cout.write(buf, len);


        return 0;
}
小结
从演示代码能够得知


ASIO的TCP协议经过boost::asio::ip名 空间下的tcp类进行通讯。
IP地址(address,address_v4,address_v6)、 端口号和协议版本组成一个端点(tcp:: endpoint)。用于在服务器端生成tcp::acceptor对 象,并在指定端口上等待链接;或者在客户端链接到指定地址的服务器上。
socket是 服务器与客户端通讯的桥梁,链接成功后全部的读写都是经过socket对 象实现的,当socket析 构后,链接自动断 开。
ASIO读写所用的缓冲区用buffer函 数生成,这个函数生成的是一个ASIO内部使用的缓冲区类,它能把数组、指针(同时指定大 小)、std::vector、std::string、boost::array包装成缓冲区类。
ASIO中的函数、类方法都接受一个boost::system::error_code类 型的数据,用于提供出错码。它能够转换成bool测试是否出错,并经过boost::system::system_error类 得到详细的出错信息。另外,也能够不向ASIO的函数或方法提供 boost::system::error_code,这时若是出错的话就会直 接抛出异常,异常类型就是boost::system:: system_error(它是从std::runtime_error继承的)。
ASIO的异步方式
嗯?异步方式好像有点坐不住了,那就请异步方式上场,你们欢迎...


自我介绍
你们好,我是异步方式


和同步方式不一样,我历来不花时间去等那些龟速的IO操做,我只是向系统说一声要作什么,而后就能够作其它事去了。若是系统完成了操做, 系统就会经过我以前给它的回调对象来通知我。


在ASIO库中,异步方式的函数或方法名称前面都有“async_” 前缀,函数参数里会要求放一个回调函数(或仿函数)。异步操做执行 后无论有没有完成都会当即返回,这时能够作一些其它事,直到回调函数(或仿函数)被调用,说明异步操做已经完成。


在ASIO中不少回调函数都只接受一个boost::system::error_code参数,在实际使用时确定是不够的,因此通常 使用仿函数携带一堆相关数据做为回调,或者使用boost::bind来绑定一堆数据。


另外要注意的是,只有io_service类的run()方法运行以后回调对象才会被调用,不然即便系统已经完成了异步操做也不会有任 务动做。


示例代码
好了,就介绍到这里,下面是我带来的异步方式TCP Helloworld 服务器端:


#include <iostream>
#include <string>
#include <boost/asio.hpp>
#include <boost/bind.hpp>
#include <boost/smart_ptr.hpp>


using namespace boost::asio;
using boost::system::error_code;
using ip::tcp;


struct CHelloWorld_Service
{
        CHelloWorld_Service(io_service &iosev)
                :m_iosev(iosev),m_acceptor(iosev, tcp::endpoint(tcp::v4(), 1000))
        {}


        void start()
        {
                // 开始等待链接(非阻塞)
                boost::shared_ptr<tcp::socket> psocket(new tcp::socket(m_iosev));
                // 触发的事件只有error_code参数,因此用boost::bind把socket绑定进去
                m_acceptor.async_accept(*psocket,
                        boost::bind(&CHelloWorld_Service::accept_handler, this, psocket, _1));
        }


        // 有客户端链接时accept_handler触发
        void accept_handler(boost::shared_ptr<tcp::socket> psocket, error_code ec)
        {
                if(ec) return;
                // 继续等待链接
                start();
                // 显示远程IP
                std::cout << psocket->remote_endpoint().address() << std::endl;
                // 发送信息(非阻塞)
                boost::shared_ptr<std::string> pstr(new std::string("hello async world!"));
                psocket->async_write_some(buffer(*pstr),
                        boost::bind(&CHelloWorld_Service::write_handler, this, pstr, _1, _2));
        }


        // 异步写操做完成后write_handler触发
        void write_handler(boost::shared_ptr<std::string> pstr, error_code ec,
                size_t bytes_transferred)
        {
                if(ec)
                std::cout<< "发送失败!" << std::endl;
                else
                std::cout<< *pstr << " 已发送" << std::endl;
        }


        private:
                io_service &m_iosev;
                ip::tcp::acceptor m_acceptor;
};


int main(int argc, char* argv[])
{
        io_service iosev;
        CHelloWorld_Service sev(iosev);
        // 开始等待链接
        sev.start();
        iosev.run();


        return 0;
}
小结
在这个例子中,首先调用sev.start()开 始接受客户端链接。因为async_accept调 用后当即返回,start()方 法 也就立刻完成了。sev.start()在 瞬间返回后iosev.run()开 始执行,iosev.run()方法是一个循环,负责分发异步回调事件,只 有全部异步操做所有完成才会返回。


这里有个问题,就是要保证start()方法中m_acceptor.async_accept操 做所用的tcp::socket对 象 在整个异步操做期间保持有效(不 然系统底层异步操做了一半忽然发现tcp::socket没了,不是拿人家开涮嘛-_-!!!),并且客户端链接进来后这个tcp::socket对象还 有用呢。这里的解决办法是使用一个带计数的智能指针boost::shared_ptr,并把这个指针做为参数绑定到回调函数上。


一旦有客户链接,咱们在start()里给的回调函数accept_handler就会被 调用,首先调用start()继续异步等待其 它客户端的链接,而后使用绑定进来的tcp::socket对象与当前客户端通讯。


发送数据也使用了异步方式(async_write_some), 一样要保证在整个异步发送期间缓冲区的有效性,因此也用boost::bind绑定了boost::shared_ptr。


对于客户端也同样,在connect和read_some方法前加一个async_前缀,而后加入回调便可,你们本身练习写一写。


ASIO的“便民措施”
asio中提供一些便利功能,如此能够实现许多方便的操做。


端点
回到前面的客户端代码,客户端的链接很简单,主要代码就是两行:


...
// 链接
socket.connect(endpoint,ec);
...
// 通讯
socket.read_some(buffer(buf), ec);
不过链接以前咱们必须获得链接端点endpoint,也就是服务器地址、端口号以及所用的协议版本。


前面的客户端代码假设了服务器使用IPv4协议,服务器IP地址为127.0.0.1,端口号为1000。实际使用的状况是,咱们常常只能知道服务器网络ID,提供的服务类型,这时咱们就得使用ASIO提供的tcp::resolver类来取得服务器的端点了。





好比咱们要取得163网站的首页,首先就要获得“www.163.com”服务器的HTTP端点:


io_service iosev;
ip::tcp::resolver res(iosev);
ip::tcp::resolver::query query("www.163.com","80"); //www.163.com 80端口
ip::tcp::resolver::iterator itr_endpoint = res.resolve(query);
这里的itr_endpoint是一个endpoint的迭代器,服务器的同一端口上可能不止一个端点,好比同时有IPv4和IPv6 两种。如今,遍历这些端点,找到可用的:


// 接上面代码
ip::tcp::resolver::iterator itr_end; //无参数构造生成end迭代器
ip::tcp::socket socket(iosev);
boost::system::error_code ec = error::host_not_found;
for(;ec && itr_endpoint!=itr_end;++itr_endpoint)
{
        socket.close();
        socket.connect(*itr_endpoint, ec);
}
若是链接上,错误码ec被清空,咱们就能够与服务器通讯了:


if(ec)
{
        std::cout << boost::system::system_error(ec).what() << std::endl;
        return -1;
}
// HTTP协议,取根路径HTTP源码
socket.write_some(buffer("GET <a href="http://www.163.com" title="http://www.163.com">http://www.163.com</a> HTTP/1.0 "));
for(;;)
{
        char buf[128];
        boost::system::error_code error;
        size_t len = socket.read_some(buffer(buf), error);
        // 循环取数据,直到取完为止
        if(error == error::eof)
        break;
        else if(error)
        {
                std::cout << boost::system::system_error(error).what() << std::endl;
                return -1;
        }


        std::cout.write(buf, len);
}
当全部HTTP源码下载了之后,服务器会主动断开链接,这时客户端的错误码获得boost::asio::error::eof,咱们 要根据它来断定是否跳出循环。


ip::tcp::resolver::query的构造函数接受服务器名和服务名。前面的服务名咱们直接使用了端口号"80",有时 咱们也能够使用别名,用记事本打开%windir%\system32\drivers\etc\services文件(Windows环境),能够看到 一堆别名及对应的端口,如:


echo           7/tcp                 # Echo
ftp           21/tcp                 # File Transfer Protocol (Control)
telnet        23/tcp                 # Virtual Terminal Protocol
smtp          25/tcp                 # Simple Mail Transfer Protocol
time          37/tcp  timeserver     # Time
好比要链接163网站的telnet端口(若是有的话),能够这样写:


ip::tcp::resolver::query query("www.163.com","telnet");
ip::tcp::resolver::iterator itr_endpoint = res.resolve(query);
超时
在网络应用里,经常要考虑超时的问题,否则链接后半天没反应谁也受不了。


ASIO库提供了deadline_timer类来支持定时触发,它的用法是:


// 定义定时回调
void print(const boost::system::error_code& /*e*/)
{
        std::cout << "Hello, world! ";
}

deadline_timer timer;
// 设置5秒后触发回调
timer.expires_from_now(boost::posix_time::seconds(5));
timer.async_wait(print);
这段代码执行后5秒钟时打印Hello World!


咱们能够利用这种定时机制和异步链接方式来实现超时取消:


deadline_timer timer;
// 异步链接
socket.async_connect(my_endpoint, connect_handler/*链接回调*/);
// 设置超时
timer.expires_from_now(boost::posix_time::seconds(5));
timer.async_wait(timer_handler);
...
// 超时发生时关闭socket
void timer_handler()
{
        socket.close();
}
最后不要忘了io_service的run()方法。


统一读写接口
除了前面例子所用的tcp::socket读写方法(read_some, write_some等)之外,ASIO也提供了几个读写函数,主要有这么几个:


read、write、read_until、write_until
固然还有异步版本的


async_read、async_write、async_read_until、async_write_until
这些函数能够以统一的方式读写TCP、串口、HANDLE等类型的数据流。


咱们前面的HTTP客户端代码能够这样改写:


...
//socket.write_some(buffer("GET <a href="http://www.163.com" title="http://www.163.com">http://www.163.com</a> HTTP/1.0 "));
write(socket,buffer("GET <a href="http://www.163.com" title="http://www.163.com">http://www.163.com</a> HTTP/1.0 "));
...
//size_t len = socket.read_some(buffer(buf), error);
size_t len = read(socket, buffer(buf), transfer_all() ,error);
if(len) std::cout.write(buf, len);
这个read和write有多个重载,一样,有错误码参数的不会抛出异常而无错误码参数的若出错则抛出异常。


本例中read函数里的transfer_all()是一个称为CompletionCondition的对象,表示读取/写入直接缓 冲区装满或出错为止。另外一个可选的是transfer_at_least(size_t),表示至少要读取/写入多少个字符。


read_until和write_until用于读取直到某个条件知足为止,它接受的参数再也不是buffer,而是boost::asio:: streambuf。


好比咱们能够把咱们的HTTP客户端代码改为这样:


boost::asio::streambuf strmbuf;
size_t len = read_until(socket,strmbuf," ",error);
std::istream is(&strmbuf);
is.unsetf(std::ios_base::skipws);
// 显示is流里的内容
std::copy(std::istream_iterator<char>(is),
    std::istream_iterator<char>(),
    std::ostream_iterator<char>(std::cout));
基于流的操做
对于TCP协议来讲,ASIO还提供了一个tcp::iostream。用它能够更简单地实现咱们的HTTP客户端:


ip::tcp::iostream stream("www.163.com", "80");
if(stream)
{
        // 发送数据
        stream << "GET <a href="http://www.163.com" title="http://www.163.com">http://www.163.com</a> HTTP/1.0 ";
        // 不要忽略空白字符
        stream.unsetf(std::ios_base::skipws);
        // 显示stream流里的内容
        std::copy(std::istream_iterator<char>(stream),
        std::istream_iterator<char>(),
        std::ostream_iterator<char>(std::cout));
}
用ASIO编写UDP通讯程序
ASIO的TCP协议经过boost::asio::ip名空间下的tcp类进行通讯,举一返三:ASIO的UDP协议经过boost::asio::ip名空间下的udp类进行通讯。


咱们知道UDP是基于数据报模式的,因此事先不须要创建链接。就象寄信同样,要寄给谁只要写上地址往门口的邮箱一丢,其它的事各级邮局 包办;要收信用只要看看自家信箱里有没有信件就行(或问门口传达室老大爷)。在ASIO里,就是udp::socket的send_to和receive_from方法(异步版本是async_send_to和asnync_receive_from)。


下面的示例代码是从ASIO官方文档里拿来的(实在想不出更好的例子了:-P):


服务器端代码


//
// server.cpp
// ~~~~~~~~~~
//
// Copyright (c) 2003-2008 Christopher M. Kohlhoff 
// (chris at kohlhoff dot com)
//
// Distributed under the Boost Software License, Version 1.0. 
// (See accompanying
// file LICENSE_1_0.txt or 
// copy at <a href="http://www.boost.org/LICENSE_1_0.txt" title="http://www.boost.org/LICENSE_1_0.txt">http://www.boost.org/LICENSE_1_0.txt</a>)
//


#include <ctime>
#include <iostream>
#include <string>
#include <boost/array.hpp>
#include <boost/asio.hpp>


using boost::asio::ip::udp;


std::string make_daytime_string()
{
        using namespace std; // For time_t, time and ctime;
        time_t now = time(0);
        return ctime(&now);
}


int main()
{
        try
        {
                boost::asio::io_service io_service;
                // 在本机13端口创建一个socket
                udp::socket socket(io_service, udp::endpoint(udp::v4(), 13));


                for (;;)
                {
                        boost::array<char, 1> recv_buf;
                        udp::endpoint remote_endpoint;
                        boost::system::error_code error;
                        // 接收一个字符,这样就获得了远程端点(remote_endpoint)
                        socket.receive_from(boost::asio::buffer(recv_buf),
                        remote_endpoint, 0, error);


                        if (error && error != boost::asio::error::message_size)
                                throw boost::system::system_error(error);


                        std::string message = make_daytime_string();
                        // 向远程端点发送字符串message(当前时间)    
                        boost::system::error_code ignored_error;
                        socket.send_to(boost::asio::buffer(message),
                        remote_endpoint, 0, ignored_error);
                }
        }
        catch (std::exception& e)
        {
                std::cerr << e.what() << std::endl;
        }


        return 0;
}
客户端代码


//
// client.cpp
// ~~~~~~~~~~
//
// Copyright (c) 2003-2008 Christopher M. Kohlhoff
// (chris at kohlhoff dot com)
//
// Distributed under the Boost Software License, Version 1.0. 
// (See accompanying file LICENSE_1_0.txt or
//  copy at <a href="http://www.boost.org/LICENSE_1_0.txt" title="http://www.boost.org/LICENSE_1_0.txt">http://www.boost.org/LICENSE_1_0.txt</a>)
//


#include <iostream>
#include <boost/array.hpp>
#include <boost/asio.hpp>


using boost::asio::ip::udp;


int main(int argc, char* argv[])
{
        try
        {
                if (argc != 2)
                {
                        std::cerr << "Usage: client <host>" << std::endl;
                        return 1;
                }


                boost::asio::io_service io_service;
                // 取得命令行参数对应的服务器端点
                udp::resolver resolver(io_service);
                udp::resolver::query query(udp::v4(), argv[1], "daytime");
                udp::endpoint receiver_endpoint = *resolver.resolve(query);


                udp::socket socket(io_service);
                socket.open(udp::v4());
                // 发送一个字节给服务器,让服务器知道咱们的地址
                boost::array<char, 1> send_buf  = { 0 };
                socket.send_to(boost::asio::buffer(send_buf), receiver_endpoint);
                // 接收服务器发来的数据
                boost::array<char, 128> recv_buf;
                udp::endpoint sender_endpoint;
                size_t len = socket.receive_from(
                boost::asio::buffer(recv_buf), sender_endpoint);


                std::cout.write(recv_buf.data(), len);
        }
        catch (std::exception& e)
        {
                std::cerr << e.what() << std::endl;
        }


        return 0;
}
用ASIO读写串行口
ASIO不只支持网络通讯,还能支持串口通讯。要让两个设备使用串口通讯,关键是要设置好正确的参数,这些参数是:波特率、奇偶校验 位、中止位、字符大小和流量控制。两个串口设备只有设置了相同的参数才能互相交谈。


ASIO提供了boost::asio::serial_port类,它有一个set_option(const SettableSerialPortOption& option)方法就是用于设置上面列举的这些参数的,其中的option能够是:


serial_port::baud_rate 波特率,构造参数为unsigned int
serial_port::parity 奇偶校验,构造参数为serial_port::parity::type,enum类型,能够是none, odd, even。
serial_port::flow_control 流量控制,构造参数为serial_port::flow_control::type,enum类型,能够是none software hardware
serial_port::stop_bits 中止位,构造参数为serial_port::stop_bits::type,enum类型,能够是one onepointfive two
serial_port::character_size 字符大小,构造参数为unsigned int
演示代码
#include <iostream>
#include <boost/asio.hpp>
#include <boost/bind.hpp>


using namespace std;
using namespace boost::asio;


int main(int argc, char* argv[])
{
        io_service iosev;
        // 串口COM1, Linux下为“/dev/ttyS0”
        serial_port sp(iosev, "COM1");
        // 设置参数
        sp.set_option(serial_port::baud_rate(19200));
        sp.set_option(serial_port::flow_control(serial_port::flow_control::none));
        sp.set_option(serial_port::parity(serial_port::parity::none));
        sp.set_option(serial_port::stop_bits(serial_port::stop_bits::one));
        sp.set_option(serial_port::character_size(8));
        // 向串口写数据
        write(sp, buffer("Hello world", 12));


        // 向串口读数据
        char buf[100];
        read(sp, buffer(buf));


        iosev.run();
        return 0;
}
上面这段代码有个问题,read(sp, buffer(buf))非得读满100个字符才会返回,串口通讯有时咱们确实能知道对方发过来的字符长度,有时候是不能的。


若是知道对方发过来的数据里有分隔符的话(好比空格做为分隔),能够使用read_until来读,好比:


boost::asio::streambuf buf;
// 一直读到遇到空格为止
read_until(sp, buf, ' ');
copy(istream_iterator<char>(istream(&buf)>>noskipws),
        istream_iterator<char>(),
        ostream_iterator<char>(cout));
另一个方法是使用前面说过的异步读写+超时的方式,代码以下:


#include <iostream>
#include <boost/asio.hpp>
#include <boost/bind.hpp>


using namespace std;
using namespace boost::asio;
void handle_read(char *buf,boost::system::error_code ec,
std::size_t bytes_transferred)
{
        cout.write(buf, bytes_transferred);
}


int main(int argc, char* argv[])
{
        io_service iosev;
        serial_port sp(iosev, "COM1");
        sp.set_option(serial_port::baud_rate(19200));
        sp.set_option(serial_port::flow_control());
        sp.set_option(serial_port::parity());
        sp.set_option(serial_port::stop_bits());
        sp.set_option(serial_port::character_size(8));


        write(sp, buffer("Hello world", 12));


        // 异步读
        char buf[100];
        async_read(sp, buffer(buf), boost::bind(handle_read, buf, _1, _2));
        // 100ms后超时
        deadline_timer timer(iosev);
        timer.expires_from_now(boost::posix_time::millisec(100));
        // 超时后调用sp的cancel()方法放弃读取更多字符
        timer.async_wait(boost::bind(&serial_port::cancel, boost::ref(sp)));


        iosev.run();
        return 0;
}


 

usidc5 2011-07-08 18:32

asio自带的例子里是用deadline_timer的async_wait方法来实现超时的,这种方法须要单独写一个回调函数,不利于把链接和超时封装到单个函数里。传统的Winsock编程能够先把socket设为非阻塞,而后connect,再用select来判断超时,asio也能够这样作,惟一“非主流”的是asio里没有一个相似select的函数,因此得调用原始的Winsock API,也就牺牲了跨平台:前端

  1. #include <iostream>  
  2. #include <boost/asio.hpp>  
  3.    
  4. int main()  
  5. {  
  6.     boost::asio::io_service ios;  
  7.     boost::asio::ip::tcp::socket s(ios);  
  8.     boost::system::error_code ec;  
  9.    
  10.     s.open(boost::asio::ip::tcp::v4());  
  11.     // 设为非阻塞  
  12.     s.io_control(boost::asio::ip::tcp::socket::non_blocking_io(true));  
  13.     // connect时必须指定error_code参数,不然会有异常抛出  
  14.     s.connect(  
  15.         boost::asio::ip::tcp::endpoint(  
  16.         boost::asio::ip::address::from_string("192.168.1.1"), 80)  
  17.         , ec);  
  18.     fd_set fdWrite;  
  19.     FD_ZERO(&fdWrite);  
  20.     FD_SET(s.native(), &fdWrite);  
  21.     timeval tv = {5};    // 5秒超时  
  22.     if (select(0, NULL, &fdWrite, NULL, &tv) <= 0   
  23.         || !FD_ISSET(s.native(), &fdWrite))  
  24.     {  
  25.         std::cout << "超时/出错啦" << std::endl;  
  26.         s.close();  
  27.         return 0;  
  28.     }  
  29.     // 设回阻塞  
  30.     s.io_control(boost::asio::ip::tcp::socket::non_blocking_io(false));  
  31.     std::cout << "链接成功" << std::endl;  
  32.     s.close();  
  33.    
  34.     return 0;  

 

usidc5 2011-07-08 18:34

全部的 asio 类都只要包含头文件:   "asio.hpp"


例子1:   使用一个同步的定时器

#include <iostream>
#include <boost/asio.hpp>
#include <boost/date_time/posix_time/posix_time.hpp>   //使用时间间隔
int main()
{
//全部使用 asio 的程序都必须至少拥有一个 io_service 类型的对象. 
//它提供 I/O 功能. 
boost::asio::io_service io;
//建立一个定时器 deadline_timer 的对象. 
//这种定时器有两种状态: 超时 和 未超时.
//在超时的状态下. 调用它的 wait() 函数会当即返回. 
//在未超时的状况下则阻塞. 直到变为超时状态.
//它的构造函数参数为: 一个 io_service 对象(asio中主要提供IO的类都用io_service对象作构造函数第一个参数).
//                     超时时间.
//从建立开始. 它就进入 "未超时"状态. 且持续指定的时间. 转变到"超时"状态.
boost::asio::deadline_timer t(io, boost::posix_time::seconds(5));
//这里等待定时器 t 超时. 因此会阻塞5秒.
t.wait();
std::cout << "Hello, world!\n";
return 0;
}






例子2: 使用一个异步的定时器
//一个将被定时器异步调用的函数. 
void print(const boost::system::error_code& /*e*/)
{
std::cout << "Hello, world!\n";
}
int main()
{
boost::asio::io_service io;
boost::asio::deadline_timer t(io, boost::posix_time::seconds(5));
//和例子1不一样. 这里调用 async_wait() 执行一个异步的等待. 它注册一个可执行体(即此处的print函数).   //这里不懂的是: print的参数怎么传入?
//实际上. 这个执行体被注册到 deadline_timer 类的 io_service 成员上(即本例的 io 对象). 只有在之后调用 io.run() 时这些注册的执行体才会被真正执行. 
t.async_wait(print);
//调用 io对象的 run() 函数执行那些被注册的执行体. 
//这个函数不会当即返回. 除非和他相关的定时器对象超时而且在定时器超时后执行完全部注册的执行体. 以后才返回. 
//因此它在这里阻塞一下子. 等t超时后执行完print. 才返回.
//这里要注意的是. 调用 io.run() 能够放在其它线程中. 那样全部的回调函数都在别的线程上运行.
io.run();
return 0;
}




例子3: 向超时回调函数绑定参数

// 这个例子中. 每次 定时器超时后. 都修改定时器的状态到"未超时". 再注册回调函数. 这样循环 5 次. 因此 print 会被执行 5 次.
void print(const boost::system::error_code& /*e*/,
    boost::asio::deadline_timer* t, int* count)
{
if (*count < 5)
{
    std::cout << *count << "\n";
    ++(*count);
    //能够用 deadline_timer::expires_at() 来 获取/设置 超时的时间点. 
    //在这里咱们将超时的时间点向后推迟一秒. 
    t->expires_at(t->expires_at() + boost::posix_time::seconds(1));
    //再次向 t 中的 io_service 对象注册一个回掉函数. 
    // 注意这里绑定时. 指定了绑定到 print 的第一个参数为: boost::asio::placeholders::error //不懂. 这个error是什么东西. 为何在例子2中不要绑定它?
    t->async_wait(boost::bind(print,
          boost::asio::placeholders::error, t, count));
}
}
int main()
{
boost::asio::io_service io;
int count = 0;
boost::asio::deadline_timer t(io, boost::posix_time::seconds(1));
t.async_wait(boost::bind(print,
        boost::asio::placeholders::error, &t, &count));
io.run();
std::cout << "Final count is " << count << "\n";
return 0;
}



例子4: 多线程处理定时器的回掉函数. 同步的问题.
前面的例子都只在一个线程中调用 boost::asio::io_service::run() 函数. 
向定时器注册的回掉函数都是在调用该 run() 的线程中执行.
但实际上这个 run() 函数能够在多个线程中同时被调用. 例如:
boost::asio::io_service io; 
//两个定时器
boost::asio::deadline_timer t1(io, boost::posix_time::seconds(1));
t1.async_wait(func1);   
boost::asio::deadline_timer t2(io, boost::posix_time::seconds(1));
t2.async_wait(func2); 

因为向 io 注册了多个cmd. 这里为了效率咱们想让这些cmd并行执行:
boost::thread thread1(bind(&boost::asio::io_service::run, &io);
boost::thread thread2(bind(&boost::asio::io_service::run, &io);
thread1.join();
thread2.join();
这里在多个线程中调用 io.run() 因此咱们注册的cmd可能在任何一个线程中运行. 
这些线程会一直等待io对象相关的定时器超时并执行相关的 cmd. 直到全部的定时器都超时. run函数才返回. 线程才结束.
但这里有一个问题: 咱们向定时器注册的 func1 和 func2 . 它们可能会同时访问全局的对象(好比 cout ). 
这时咱们但愿对 func1 和 func2 的调用是同步的. 即执行其中一个的时候. 另外一个要等待.
这就用到了 boost::asio::strand 类. 它能够把几个cmd包装成同步执行的. 例如前面咱们向定时器注册 func1 和 func2 时. 能够改成:
boost::asio::strand the_strand;
t1.async_wait(the_strand.wrap(func1));      //包装为同步执行的
t2.async_wait(the_strand.wrap(func2)); 
这样就保证了在任什么时候刻. func1 和 func2 都不会同时在执行.

 

usidc5 2011-07-08 18:35
 
 
  • // test.cpp : 定义控制台应用程序的入口点。  
 
  • //  
 
  •   
 
  • #include "stdafx.h"  
 
  • #include <boost/asio.hpp>  
 
  • #include <boost/bind.hpp>  
 
  • #include <boost/date_time/posix_time/posix_time_types.hpp>  
 
  • #include <iostream>  
 
  •   
 
  • using namespace boost::asio;  
 
  • using boost::asio::ip::tcp;  
 
  •   
 
  • class connect_handler  
 
  • {  
 
  • public:  
 
  •     connect_handler(io_service& ios)  
 
  •         : io_service_(ios),  
 
  •         timer_(ios),  
 
  •         socket_(ios)  
 
  •     {  
 
  •         socket_.async_connect(  
 
  •             tcp::endpoint(boost::asio::ip::address_v4::loopback(), 3212),  
 
  •             boost::bind(&connect_handler::handle_connect, this,  
 
  •             boost::asio::placeholders::error));  
 
  •   
 
  •         timer_.expires_from_now(boost::posix_time::seconds(5));  
 
  •         timer_.async_wait(boost::bind(&connect_handler::close, this));  
 
  •     }  
 
  •   
 
  •     void handle_connect(const boost::system::error_code& err)  
 
  •     {  
 
  •         if (err)  
 
  •         {  
 
  •             std::cout << "Connect error: " << err.message() << "\n";  
 
  •         }  
 
  •         else  
 
  •         {  
 
  •             std::cout << "Successful connection\n";  
 
  •         }  
 
  •     }  
 
  •   
 
  •     void close()  
 
  •     {  
 
  •         socket_.close();  
 
  •     }  
 
  •   
 
  • private:  
 
  •     io_service& io_service_;  
 
  •     deadline_timer timer_;  
 
  •     tcp::socket socket_;  
 
  • };  
 
  •   
 
  • int main()  
 
  • {  
 
  •     try  
 
  •     {  
 
  •         io_service ios;  
 
  •         tcp::acceptor a(ios, tcp::endpoint(tcp::v4(), 32123), 1);  
 
  •   
 
  •         // Make lots of connections so that at least some of them will block.  
 
  •         connect_handler ch1(ios);  
 
  •         //connect_handler ch2(ios);  
 
  •         //connect_handler ch3(ios);  
 
  •         //connect_handler ch4(ios);  
 
  •         //connect_handler ch5(ios);  
 
  •         //connect_handler ch6(ios);  
 
  •         //connect_handler ch7(ios);  
 
  •         //connect_handler ch8(ios);  
 
  •         //connect_handler ch9(ios);  
 
  •   
 
  •         ios.run();  
 
  •     }  
 
  •     catch (std::exception& e)  
 
  •     {  
 
  •         std::cerr << "Exception: " << e.what() << "\n";  
 
  •     }  
 
  •   
 
  •     return 0;  
 
  • }  
 

 

usidc5 2011-07-08 19:49
服务器代码:


Servier.cpp
#include <boost/asio.hpp>
#include <boost/bind.hpp>
#include <boost/shared_ptr.hpp>
#include <boost/enable_shared_from_this.hpp>
#include <iostream>
using boost::asio::ip::tcp;
#define max_len 1024
class clientSession
:public boost::enable_shared_from_this<clientSession>
{
public:
clientSession(boost::asio::io_service& ioservice)
:m_socket(ioservice)
{
memset(data_,‘\0′,sizeof(data_));
}
~clientSession()
{}
tcp::socket& socket()
{
return m_socket;
}
void start()
{
boost::asio::async_write(m_socket,
boost::asio::buffer(“link successed!”),
boost::bind(&clientSession::handle_write,shared_from_this(),
boost::asio::placeholders::error));
/*async_read跟客户端同样,仍是不能进入handle_read函数,若是你能找到问题所在,请告诉我,谢谢*/
// --已经解决,boost::asio::async_read(...)读取的字节长度不能大于数据流的长度,不然就会进入
// ioservice.run()线程等待,read后面的就不执行了。
//boost::asio::async_read(m_socket,boost::asio::buffer(data_,max_len),


//         boost::bind(&clientSession::handle_read,shared_from_this(),


//         boost::asio::placeholders::error));
//max_len能够换成较小的数字,就会发现async_read_some能够连续接收未收完的数据


m_socket.async_read_some(boost::asio::buffer(data_,max_len),
boost::bind(&clientSession::handle_read,shared_from_this(),
boost::asio::placeholders::error));
}
private:
void handle_write(const boost::system::error_code& error)
{
if(error)
{
m_socket.close();
}
}
void handle_read(const boost::system::error_code& error)
{
if(!error)
{
std::cout << data_ << std::endl;
//boost::asio::async_read(m_socket,boost::asio::buffer(data_,max_len),


//     boost::bind(&clientSession::handle_read,shared_from_this(),


//     boost::asio::placeholders::error));


m_socket.async_read_some(boost::asio::buffer(data_,max_len),
boost::bind(&clientSession::handle_read,shared_from_this(),
boost::asio::placeholders::error));
}
else
{
m_socket.close();
}
}
private:
tcp::socket m_socket;
char data_[max_len];
};
class serverApp
{
typedef boost::shared_ptr<clientSession> session_ptr;
public:
serverApp(boost::asio::io_service& ioservice,tcp::endpoint& endpoint)
:m_ioservice(ioservice),
acceptor_(ioservice,endpoint)
{
session_ptr new_session(new clientSession(ioservice));
acceptor_.async_accept(new_session->socket(),
boost::bind(&serverApp::handle_accept,this,boost::asio::placeholders::error,
new_session));
}
~serverApp()
{
}
private:
void handle_accept(const boost::system::error_code& error,session_ptr& session)
{
if(!error)
{
std::cout << “get a new client!” << std::endl;
//实现对每一个客户端的数据处理


session->start();
//在这就应该看出为何要封session类了吧,每个session就是一个客户端


session_ptr new_session(new clientSession(m_ioservice));
acceptor_.async_accept(new_session->socket(),
boost::bind(&serverApp::handle_accept,this,boost::asio::placeholders::error,
new_session));
}
}
private:
boost::asio::io_service& m_ioservice;
tcp::acceptor acceptor_;
};
int main(int argc , char* argv[])
{
boost::asio::io_service myIoService;
short port = 8100/*argv[1]*/;
//咱们用的是inet4


tcp::endpoint endPoint(tcp::v4(),port);
//终端(能够看做sockaddr_in)完成后,就要accept了


serverApp sa(myIoService,endPoint);
//数据收发逻辑


myIoService.run();
return 0;
}
客户端代码:
#include <boost/asio.hpp>
#include <boost/bind.hpp>
#include <boost/shared_ptr.hpp>
using boost::asio::ip::tcp;
class client
{
public:
client(boost::asio::io_service& io_service,tcp::endpoint& endpoint)
: socket(io_service)//这里就把socket实例化了
{
//链接服务端 connect
socket.async_connect(endpoint,
boost::bind(&client::handle_connect,this,boost::asio::placeholders::error)
);
memset(getBuffer,‘\0′,1024);
}
~client()
{}
private:
void handle_connect(const boost::system::error_code& error)
{
if(!error)
{
//一连上,就向服务端发送信息
boost::asio::async_write(socket,boost::asio::buffer(“hello,server!”),
boost::bind(&client::handle_write,this,boost::asio::placeholders::error));
/**读取服务端发下来的信息
*这里很奇怪,用async_read根本就不能进入handle_read函数
**/
// --已经解决,boost::asio::async_read(...)读取的字节长度不能大于数据流的长度,不然就会进入
// ioservice.run()线程等待,read后面的就不执行了。
//boost::asio::async_read(socket,
//     boost::asio::buffer(getBuffer,1024),
//     boost::bind(&client::handle_read,this,boost::asio::placeholders::error)
//    );
socket.async_read_some(boost::asio::buffer(getBuffer,1024),
boost::bind(&client::handle_read,this,boost::asio::placeholders::error)
);
}
else
{
socket.close();
}
}
void handle_read(const boost::system::error_code& error)
{
if(!error)
{
std::cout << getBuffer << std::endl;
//boost::asio::async_read(socket,
//         boost::asio::buffer(getBuffer,1024),
//         boost::bind(&client::handle_read,this,boost::asio::placeholders::error)
//        );
//这样就能够实现循环读取了,至关于while(1)
//固然,到了这里,作过网络的朋友就应该至关熟悉了,一些逻辑就能够自行扩展了
//想作聊天室的朋友能够用多线程来实现
socket.async_read_some(
boost::asio::buffer(getBuffer,1024),
boost::bind(&client::handle_read,this,boost::asio::placeholders::error)
);
}
else
{
socket.close();
}
}
void handle_write(const boost::system::error_code& error)
{
}
private:
tcp::socket socket;
char getBuffer[1024];
};
int main(int argc,char* argv[])
{
//if(argc != 3)
//{
// std::cerr << “Usage: chat_client <host> <port>\n”;
//    return 1;
//}
//我觉IO_SERVICE是一个基本性的接口,基本上一般用到的类实例都须要经过它来构造
//功能咱们能够看似socket
boost::asio::io_service io_service;
//这个终端就是服务器
//它的定义就能够看做时sockaddr_in,咱们用它来定义IP和PORT
tcp::endpoint endpoint(boost::asio::ip::address_v4::from_string("192.168.1.119"/*argv[1]*/),8100/*argv[2]*/);
//既然socket和sockaddr_in已经定义好了,那么,就能够CONNECT了
//之因此为了要把链接和数据处理封成一个类,就是为了方便管理数据,这点在服务端就会有明显的感受了
boost::shared_ptr<client> client_ptr(new client(io_service,endpoint));
//执行收发数据的函数
io_service.run();
return 0;
}
修改192.168.1.119为127.0.0.1,而后先运行server,再运行client,一切ok.

 

usidc5 2011-07-08 19:50
理论基础
许多应用程序以某种方式和外界交互,例如文件,网络,串口或者终端。某些状况下例如网络,独立IO操做须要很长时间才能完成,这对程序开发造成了一个特殊的挑战。


Boost.Asio库提供管理这些长时间操做的工具,而且不须要使用基于线程的并发模型和显式的锁。


Asio库致力于以下几点:


移植性


高负载


效率


基于已知API例如BSD sockets的模型概念


易于使用


做为进一步抽象的基础


虽然asio主要关注网络,它的异步概念也扩展到了其余系统资源,例如串口,文件等等。


主要概念和功能
基本架构(略)
Proactor设计模式:无需额外线程的并发机制(略)
这种模型感受很像aio或者iocp,而select,epoll则应该相似于Reactor。


线程和Asio


线程安全


通常来讲,并发使用不一样的对象是安全的。但并发使用同一个对象是不安全的。不过io_service等类型的并发使用是安全的。


线程池


多个线程能够同时调用io_service::run,多个线程是平等的。


内部线程


为了某些特定功能,asio内部使用了thread以模拟异步,这些thread对用户而言是不可见的。它们都符合以下原则:


它们不会直接调用任何用户代码
他们不会被任何信号中断。
注意,以下几种状况违背了原则1。


ip::basic_resolver::async_resolve() 全部平台


basic_socket::async_connect() windows平台


涉及null_buffers()的任何操做 windows平台


以上是容易理解的,asio自己尽量不建立thread,某些状况下,例如connect,因为windows 2k平台下并不提供异步connect,因此asio只能用select模拟,这种状况下不得不建立新线程。windows xp下提供connectex,但考虑到兼容性,asio彷佛并未使用。


asio彻底保证而后异步完成函数都仅在运行io_service::run的线程中被调用。


同时,建立而且管理运行io_service::run的线程是用户的责任。


Strands:使用多线程且无需显式锁


有3种方式能够显式或隐式使用锁。


只在一个线程中调用io_service::run,那么全部异步完成函数都会在该线程中串行化调用
应用逻辑保证
直接使用strand
strand::wrap能够建立一个包裹handler用于post或其余异步调用。


Buffers
Asio支持多个buffer同时用于读写,相似于WSARecv里面的WSABUF数组。mutable_buffer和const_buffer相似于WSABUF,MutableBufferSequence和ConstBufferSequence相似于WSABUF的容器。


Buffer自己不分配释放内存,该数据结构很简单。


vc8及以上的编译器在debug编译时缺省支持检查越界等问题。其余编译器能够用BOOST_ASIO_DISABLE_BUFFER_DEBUGGING打开这个开关。


流,不彻底读和不彻底写
许多io对象是基于流的,这意味着:


没有消息边界,数据是连续的字节。
读或者写操做可能仅传送了要求的部分字节,这称之为不彻底读/写。
read_some,async_read_some,write_some,async_write_some则为这种不彻底读/写。


系统API通常均为这种不彻底读写。例如WSASend,WSARecv等等。


通常来讲都须要读/写特定的字节。能够用read,async_read,write,async_write。这些函数在未完成任务以前会持续调用不彻底函数。


EOF
read,async_read,read_until,async_read_until在遇到流结束时会产生一个错误。这是很合理的,例如要求读4个字节,但仅读了1个字节socket就关闭了。在handle_read中error_code将提示一个错误。


Reactor类型的操做
有些应用程序必须集成第3方的库,这些库但愿本身执行io操做。


这种操做相似于select,考察select和aio的区别,前者是获得完成消息,而后再执行同步读操做,aio是预发异步读操做,在完成消息到来时,读操做已经完成。


null_buffer设计用来实现这类操做。


ip::tcp::socket socket(my_io_service);
...
ip::tcp::socket::non_blocking nb(true);
socket.io_control(nb);
...
socket.async_read_some(null_buffers(), read_handler);
...
void read_handler(boost::system::error_code ec)
{
  if (!ec)
  {
    std::vector<char> buf(socket.available());
    socket.read_some(buffer(buf));
  }
}
注意通常asio的用法和这明显不一样。以上代码很是相似select的方式。
常规代码是:
boost::asio::async_read(socket_,boost::asio::buffer(data,length),handle_read);
void handle_read(){…}
行操做
许多应用协议都是基于行的,例如HTTP,SMTP,FTP。为了简化这类操做,Asio提供read_until以及async_read_until。
例如:
class http_connection
{
  ...


  void start()
  {
    boost::asio::async_read_until(socket_, data_, "/r/n",
        boost::bind(&http_connection::handle_request_line, this, _1));
  }


  void handle_request_line(boost::system::error_code ec)
  {
    if (!ec)
    {
      std::string method, uri, version;
      char sp1, sp2, cr, lf;
      std::istream is(&data_);
      is.unsetf(std::ios_base::skipws);
      is >> method >> sp1 >> uri >> sp2 >> version >> cr >> lf;
      ...
    }
  }


  ...


  boost::asio::ip::tcp::socket socket_;
  boost::asio::streambuf data_;
};
read_until,async_read_until支持的判断类型能够是char,string以及boost::regex,它还支持自定义匹配函数。
如下例子是持续读,一直到读到空格为止:
typedef boost::asio::buffers_iterator<
    boost::asio::streambuf::const_buffers_type> iterator;


std::pair<iterator, bool>
match_whitespace(iterator begin, iterator end)
{
  iterator i = begin;
  while (i != end)
    if (std::isspace(*i++))
      return std::make_pair(i, true);
  return std::make_pair(i, false);
}
...
boost::asio::streambuf b;
boost::asio::read_until(s, b, match_whitespace);



如下例子是持续读,直到读到特定字符为止:
class match_char
{
public:
  explicit match_char(char c) : c_(c) {}


  template <typename Iterator>
  std::pair<Iterator, bool> operator()(
      Iterator begin, Iterator end) const
  {
    Iterator i = begin;
    while (i != end)
      if (c_ == *i++)
        return std::make_pair(i, true);
    return std::make_pair(i, false);
  }


private:
  char c_;
};


namespace boost { namespace asio {
  template <> struct is_match_condition<match_char>
    : public boost::true_type {};
} } // namespace boost::asio
...
boost::asio::streambuf b;
boost::asio::read_until(s, b, match_char('a'));



自定义内存分配
Asio不少地方都须要复制拷贝handlers,缺省状况下,使用new/delete,若是handlers提供


void* asio_handler_allocate(size_t, ...);
void asio_handler_deallocate(void*, size_t, ...);
则会调用这两个函数来进行分配和释放。
The implementation guarantees that the deallocation will occur before the associated handler is invoked, which means the memory is ready to be reused for any new asynchronous operations started by the handler.


若是在完成函数中再发起一个异步请求,那么这块内存能够重用,也就是说,若是永远仅有一个异步请求在未完成的状态,那么仅须要一块内存就足够用于asio的handler copy了。


The custom memory allocation functions may be called from any user-created thread that is calling a library function. The implementation guarantees that, for the asynchronous operations included the library, the implementation will not make concurrent calls to the memory allocation functions for that handler. The implementation will insert appropriate memory barriers to ensure correct memory visibility should allocation functions need to be called from different threads.


以上这段不很清楚,不明白多线程环境下,asio_handler_allocate是否要考虑同步问题。


Custom memory allocation support is currently implemented for all asynchronous operations with the following exceptions:


ip::basic_resolver::async_resolve() on all platforms.
basic_socket::async_connect() on Windows.
Any operation involving null_buffers() on Windows, other than an asynchronous read performed on a stream-oriented socket.

 

usidc5 2011-07-08 22:50
boost::asio是一个高性能的网络开发库,Windows下使用IOCP,Linux下使用epoll。与ACE不一样的是,它并无提供一个网络框架,而是采起组件的方式来提供应用接口。可是对于常见的状况,采用一个好用的框架仍是可以简化开发过程,特别是asio的各个异步接口的用法都至关相似。
  受到 SP Server 框架的影响,我使用asio大体实现了一个多线程的半异步半同步服务器框架,如下是利用它来实现一个Echo服务器:

1. 实现回调:

    static void onSessionStarted(RequestPtr const& request, ResponsePtr const& response) {   request->setReadMode(Session::READ_LN); // 设置为行读取}static void onSession(RequestPtr const& request, ResponsePtr const& response) {   print(request->message()); //打印收到的消息   response->addReply(request->message()); //回送消息   response->close();}

复制代码
说明:close()是一个关闭请求,它并不立刻关闭Session,而是等待全部与该Session相关的异步操做所有结束后才关闭。

2. 一个单线程的Echo服务器:
    void server_main() {unsigned short port = 7;AsioService svc;AsioTcpServer tcp(svc, port);svc.callbacks().sessionStarted = &onSessionStarted;svc.callbacks().sessionHandle = &onSession;svc.run();}

复制代码
3. 一个多线程的Echo服务器(半异步半同步:一个主线程,4个工做者线程)
    void server_main2() {unsigned short port = 7;int num_threads = 4;AsioService svc;AsioService worker(AsioService::HAS_WORK);AsioTcpServer tcp(svc, port);svc.callbacks().sessionStarted = worker.wrap(&onSessionStarted);svc.callbacks().sessionHandle = worker.wrap(&onSession);AsioThreadPool thr(worker, num_threads);svc.run();}

复制代码
  有了这样一个思路,实现起来就很容易了。重点是如下两点:
  1。缓冲区的管理与内存池的使用
  2。为了保证Session的线程安全,必需要设置一个挂起状态。
      
     还有一个好处,就是彻底隔绝了asio的应用接口,不用再忍受asio漫长的编译时间了。代码就不贴在这里了,有兴趣的能够经过email 探讨。(说明,这里只提出一个思路,再也不提供源代码,请各位见谅)

 

usidc5 2011-07-08 22:54
2. 同步Timer
本章介绍asio如何在定时器上进行阻塞等待(blocking wait). 
实现,咱们包含必要的头文件. 
全部的asio类能够简单的经过include "asio.hpp"来调用.
#include <iostream>
#include <boost/asio.hpp>
此外,这个示例用到了timer,咱们还要包含Boost.Date_Time的头文件来控制时间.
#include <boost/date_time/posix_time/posix_time.hpp>
使用asio至少须要一个boost::asio::io_service对象.该类提供了访问I/O的功能.咱们首先在main函数中声明它.
int main()
{
    boost::asio::io_service io;
下一步咱们声明boost::asio::deadline_timer对象.这个asio的核心类提供I/O的功能(这里更确切的说是定时功能),老是把一个io_service对象做为他的第一个构造函数,而第二个构造函数的参数设定timer会在5秒后到时(expired).
boost::asio::deadline_timer t(io, boost::posix_time::seconds(5));
这个简单的示例中咱们演示了定时器上的一个阻塞等待.就是说,调用boost::asio::deadline_timer::wait()的在建立后5秒内(注意:不是等待开始后),timer到时以前不会返回任何值. 
一个deadline_timer只有两种状态:到时,未到时. 
若是boost::asio::deadline_timer::wait()在到时的timer对象上调用,会当即return.
t.wait();
最后,咱们输出理所固然的"Hello, world!"来演示timer到时了.
    std::cout << "Hello, world! ";
    return 0;
}
完整的代码:
#include <iostream>
#include <boost/asio.hpp>
#include <boost/date_time/posix_time/posix_time.hpp>
int main()
{
    boost::asio::io_service io;
    boost::asio::deadline_timer t(io, boost::posix_time::seconds(5));
    t.wait();
    std::cout << "Hello, world! ";
    return 0;
}

3. 异步Timer
#include <iostream>
#include <asio.hpp>
#include <boost/date_time/posix_time/posix_time.hpp>
asio的异步函数会在一个异步操做完成后被回调.这里咱们定义了一个将被回调的函数.
void print(const asio::error& /*e*/)
{
    std::cout << "Hello, world! ";
}
int main()
{
    asio::io_service io;
    asio::deadline_timer t(io, boost::posix_time::seconds(5));
这里咱们调用asio::deadline_timer::async_wait()来异步等待
t.async_wait(print);
最后,咱们必须调用asio::io_service::run(). 
asio库只会调用那个正在运行的asio::io_service::run()的回调函数. 
若是asio::io_service::run()不被调用,那么回调永远不会发生. 
asio::io_service::run()会持续工做到点,这里就是timer到时,回调完成. 
别忘了在调用 asio::io_service::run()以前设置好io_service的任务.好比,这里,若是咱们忘记先调用asio::deadline_timer::async_wait()则asio::io_service::run()会在瞬间return.
    io.run();
    return 0;
}
完整的代码:
#include <iostream>
#include <asio.hpp>
#include <boost/date_time/posix_time/posix_time.hpp>
void print(const asio::error& /*e*/)
{
    std::cout << "Hello, world! ";
}
int main()
{
    asio::io_service io;
    asio::deadline_timer t(io, boost::posix_time::seconds(5));
    t.async_wait(print);
    io.run();
    return 0;
}
4. 回调函数的参数
这里咱们将每秒回调一次,来演示如何回调函数参数的含义
#include <iostream>
#include <asio.hpp>
#include <boost/bind.hpp>
#include <boost/date_time/posix_time/posix_time.hpp>
首先,调整一下timer的持续时间,开始一个异步等待.显示,回调函数须要访问timer来实现周期运行,因此咱们再介绍两个新参数
指向timer的指针
一个int*来指向计数器
void print(const asio::error& /*e*/,
    asio::deadline_timer* t, int* count)
{
咱们打算让这个函数运行6个周期,然而你会发现这里没有显式的方法来终止io_service.不过,回顾上一节,你会发现当 asio::io_service::run()会在全部任务完成时终止.这样咱们当计算器的值达到5时(0为第一次运行的值),再也不开启一个新的异步等待就能够了.
    if (*count < 5)
    {
        std::cout << *count << " ";
        ++(*count);
...
而后,咱们推迟的timer的终止时间.经过在原先的终止时间上增长延时,咱们能够确保timer不会在处理回调函数所需时间内的到期. 
(原文:By calculating the new expiry time relative to the old, we can ensure that the timer does not drift away from the whole-second mark due to any delays in processing the handler.)
t->expires_at(t->expires_at() + boost::posix_time::seconds(1));
而后咱们开始一个新的同步等待.如您所见,咱们用把print和他的多个参数用boost::bind函数合成一个的形为void(const asio::error&)回调函数(准确的说是function object). 
在这个例子中, boost::bind的asio::placeholders::error参数是为了给回调函数传入一个error对象.当进行一个异步操做,开始 boost::bind时,你须要使用它来匹配回调函数的参数表.下一节中你会学到回调函数不须要error参数时能够省略它.
     t->async_wait(boost::bind(print,
        asio::placeholders::error, t, count));
    }
}
int main()
{
    asio::io_service io;
    int count = 0;
    asio::deadline_timer t(io, boost::posix_time::seconds(1));
和上面同样,咱们再一次使用了绑定asio::deadline_timer::async_wait()
t.async_wait(boost::bind(print,
    asio::placeholders::error, &t, &count));
io.run();
在结尾,咱们打印出的最后一次没有设置timer的调用的count的值
    std::cout << "Final count is " << count << " ";
    return 0;
}
完整的代码:
#include <iostream>
#include <asio.hpp>
#include <boost/bind.hpp>
#include <boost/date_time/posix_time/posix_time.hpp>
void print(const asio::error& /*e*/,
  bsp;     asio::deadline_timer* t, int* count)
{
    if (*count < 5)
    {
        std::cout << *count << " ";
        ++(*count);
        t->expires_at(t->expires_at() + boost::posix_time::seconds(1));
        t->async_wait(boost::bind(print,
                    asio::placeholders::error, t, count));
    }
}
int main()
{
    asio::io_service io;
    int count = 0;
    asio::deadline_timer t(io, boost::posix_time::seconds(1));
    t.async_wait(boost::bind(print,
                asio::placeholders::error, &t, &count));
    io.run();
    std::cout << "Final count is " << count << " ";
    return 0;
}

5. 成员函数做为回调函数
本例的运行结果和上一节相似
#include <iostream>
#include <boost/asio.hpp>
#include <boost/bind.hpp>
#include <boost/date_time/posix_time/posix_time.hpp>
咱们先定义一个printer类
class printer
{
public:
//构造函数有一个io_service参数,而且在初始化timer_时用到了它.用来计数的count_这里一样做为了成员变量
    printer(boost::asio::io_service& io)
        : timer_(io, boost::posix_time::seconds(1)),
            count_(0)
    {
boost::bind 一样能够出色的工做在成员函数上.众所周知,全部的非静态成员函数都有一个隐式的this参数,咱们须要把this做为参数bind到成员函数上.和上一节相似,咱们再次用bind构造出void(const boost::asio::error&)形式的函数. 
注意,这里没有指定boost::asio::placeholders::error占位符,由于这个print成员函数没有接受一个error对象做为参数.
timer_.async_wait(boost::bind(&printer::print, this));

在类的折构函数中咱们输出最后一次回调的count的值
~printer()
{
    std::cout << "Final count is " << count_ << " ";
}


print函数于上一节的十分相似,可是用成员变量取代了参数.
    void print()
    {
        if (count_ < 5)
        {
            std::cout << count_ << " ";
            ++count_;
            timer_.expires_at(timer_.expires_at() + boost::posix_time::seconds(1));
            timer_.async_wait(boost::bind(&printer::print, this));
        }
    }
private:
    boost::asio::deadline_timer timer_;
    int count_;
};

如今main函数清爽多了,在运行io_service以前只须要简单的定义一个printer对象.
int main()
{
    boost::asio::io_service io;
    printer p(io);
    io.run();
    return 0;
}
完整的代码:
#include <iostream>
#include <boost/asio.hpp>
#include <boost/bind.hpp>
#include <boost/date_time/posix_time/posix_time.hpp>
class printer
{
    public:
        printer(boost::asio::io_service& io)
            : timer_(io, boost::posix_time::seconds(1)),
            count_(0)
    {
        timer_.async_wait(boost::bind(&printer::print, this));
    }
        ~printer()
        {
            std::cout << "Final count is " << count_ << " ";
        }
        void print()
        {
            if (count_ < 5)
            {
                std::cout << count_ << " ";
                ++count_;
                timer_.expires_at(timer_.expires_at() + boost::posix_time::seconds(1));
                timer_.async_wait(boost::bind(&printer::print, this));
            }
        }
    private:
        boost::asio::deadline_timer timer_;
        int count_;
};
int main()
{
    boost::asio::io_service io;
    printer p(io);
    io.run();
    return 0;
}


6. 多线程回调同步
本节演示了使用boost::asio::strand在多线程程序中进行回调同步(synchronise). 
先前的几节阐明了如何在单线程程序中用boost::asio::io_service::run()进行同步.如您所见,asio库确保 仅当当前线程调用boost::asio::io_service::run()时产生回调.显然,仅在一个线程中调用 boost::asio::io_service::run() 来确保回调是适用于并发编程的. 
一个基于asio的程序最好是从单线程入手,可是单线程有以下的限制,这一点在服务器上尤为明显:
当回调耗时较长时,反应迟钝.
在多核的系统上无能为力
若是你发觉你陷入了这种困扰,能够替代的方法是创建一个boost::asio::io_service::run()的线程池.然而这样就容许回调函数并发执行.因此,当回调函数须要访问一个共享,线程不安全的资源时,咱们须要一种方式来同步操做.
#include <iostream>
#include <boost/asio.hpp>
#include <boost/thread.hpp>
#include <boost/bind.hpp>
#include <boost/date_time/posix_time/posix_time.hpp>
在上一节的基础上咱们定义一个printer类,这次,它将并行运行两个timer
class printer
{
public:
除了声明了一对boost::asio::deadline_timer,构造函数也初始化了类型为boost::asio::strand的strand_成员. 
boost::asio::strand 能够分配的回调函数.它保证不管有多少线程调用了boost::asio::io_service::run(),下一个回调函数仅在前一个回调函数完成后开始,固然回调函数仍然能够和那些不使用boost::asio::strand分配,或是使用另外一个boost::asio::strand分配的回调函数一块儿并发执行.
printer(boost::asio::io_service& io)
    : strand_(io),
    timer1_(io, boost::posix_time::seconds(1)),
    timer2_(io, boost::posix_time::seconds(1)),
    count_(0)
{
当一个异步操做开始时,用boost::asio::strand来 "wrapped(包装)"回调函数.boost::asio::strand::wrap()会返回一个由boost::asio::strand分配的新的handler(句柄),这样,咱们能够确保它们不会同时运行.
    timer1_.async_wait(strand_.wrap(boost::bind(&printer::print1, this)));
    timer2_.async_wait(strand_.wrap(boost::bind(&printer::print2, this)));
}
~printer()
{
    std::cout << "Final count is " << count_ << " ";
}


多线程程序中,回调函数在访问共享资源前须要同步.这里共享资源是std::cout 和count_变量. 
    void print1()
    {
        if (count_ < 10)
        {
            std::cout << "Timer 1: " << count_ << " ";
            ++count_;
            timer1_.expires_at(timer1_.expires_at() + boost::posix_time::seconds(1));
            timer1_.async_wait(strand_.wrap(boost::bind(&printer::print1, this)));
        }
    }
    void print2()
    {
        if (count_ < 10)
        {
            std::cout << "Timer 2: " << count_ << " ";
            ++count_;
            timer2_.expires_at(timer2_.expires_at() + boost::posix_time::seconds(1));
            timer2_.async_wait(strand_.wrap(boost::bind(&printer::print2, this)));
        }
    }
private:
    boost::asio::strand strand_;
    boost::asio::deadline_timer timer1_;
    boost::asio::deadline_timer timer2_;
    int count_;
};
main函数中boost::asio::io_service::run()在两个线程中被调用:主线程、一个boost::thread线程. 
正如单线程中那样,并发的boost::asio::io_service::run()会一直运行直到完成任务.后台的线程将在全部异步线程完成后终结. 
int main()
{
    boost::asio::io_service io;
    printer p(io);
    boost::thread t(boost::bind(&boost::asio::io_service::run, &io));
    io.run();
    t.join();
    return 0;
}
完整的代码:
#include <iostream>
#include <boost/asio.hpp>
#include <boost/thread.hpp>
#include <boost/bind.hpp>
#include <boost/date_time/posix_time/posix_time.hpp>
class printer
{
public:
        printer(boost::asio::io_service& io)
            : strand_(io),
            timer1_(io, boost::posix_time::seconds(1)),
            timer2_(io, boost::posix_time::seconds(1)),
            count_(0)
    {
        timer1_.async_wait(strand_.wrap(boost::bind(&printer::print1, this)));
        timer2_.async_wait(strand_.wrap(boost::bind(&printer::print2, this)));
    }
        ~printer()
        {
            std::cout << "Final count is " << count_ << " ";
        }
        void print1()
        {
            if (count_ < 10)
            {
                std::cout << "Timer 1: " << count_ << " ";
                ++count_;
                timer1_.expires_at(timer1_.expires_at() + boost::posix_time::seconds(1));
                timer1_.async_wait(strand_.wrap(boost::bind(&printer::print1, this)));
            }
        }
        void print2()
        {
            if (count_ < 10)
            {
                std::cout << "Timer 2: " << count_ << " ";
                ++count_;
                timer2_.expires_at(timer2_.expires_at() + boost::posix_time::seconds(1));
                timer2_.async_wait(strand_.wrap(boost::bind(&printer::print2, this)));
            }
        }
private:
        boost::asio::strand strand_;
        boost::asio::deadline_timer timer1_;
        boost::asio::deadline_timer timer2_;
        int count_;
};
int main()
{
    boost::asio::io_service io;
    printer p(io);
    boost::thread t(boost::bind(&boost::asio::io_service::run, &io));
    io.run();
    t.join();
    return 0;
}



7. TCP客户端:对准时间
#include <iostream>
#include <boost/array.hpp>
#include <boost/asio.hpp>
本程序的目的是访问一个时间同步服务器,咱们须要用户指定一个服务器(如time-nw.nist.gov),用IP亦可. 
(译者注:日期查询协议,这种时间传输协议不指定固定的传输格式,只要求按照ASCII标准发送数据。)
using boost::asio::ip::tcp;
int main(int argc, char* argv[])
{
    try
    {
        if (argc != 2)
        {
            std::cerr << "Usage: client <host>" << std::endl;
            return 1;
            }
用asio进行网络链接至少须要一个boost::asio::io_service对象
boost::asio::io_service io_service;


咱们须要把在命令行参数中指定的服务器转换为TCP上的节点.完成这项工做须要boost::asio::ip::tcp::resolver对象
tcp::resolver resolver(io_service);


一个resolver对象查询一个参数,并将其转换为TCP上节点的列表.这里咱们把argv[1]中的sever的名字和要查询字串daytime关联.
tcp::resolver::query query(argv[1], "daytime");


节点列表能够用 boost::asio::ip::tcp::resolver::iterator 来进行迭代.iterator默认的构造函数生成一个end iterator.
tcp::resolver::iterator endpoint_iterator = resolver.resolve(query);
tcp::resolver::iterator end;
如今咱们创建一个链接的sockert,因为得到节点既有IPv4也有IPv6的.因此,咱们须要依次尝试他们直到找到一个能够正常工做的.这步使得咱们的程序独立于IP版本
tcp::socket socket(io_service);
boost::asio::error error = boost::asio::error::host_not_found;
while (error && endpoint_iterator != end)
{
    socket.close();
    socket.connect(*endpoint_iterator++, boost::asio::assign_error(error));
}
if (error)
    throw error;
链接完成,咱们须要作的是读取daytime服务器的响应. 
咱们用boost::array来保存获得的数据,boost::asio::buffer()会自动根据array的大小暂停工做,来防止缓冲溢出.除了使用boost::array,也能够使用char [] 或std::vector.
for (;;)
{
    boost::array<char, 128> buf;
    boost::asio::error error;
    size_t len = socket.read_some(
        boost::asio::buffer(buf), boost::asio::assign_error(error));
当服务器关闭链接时,boost::asio::ip::tcp::socket::read_some()会用boost::asio::error::eof标志完成, 这时咱们应该退出读取循环了.
if (error == boost::asio::error::eof)
    break; // Connection closed cleanly by peer.
else if (error)
    throw error; // Some other error.
std::cout.write(buf.data(), len);

若是发生了什么异常咱们一样会抛出它
}
catch (std::exception& e)
{
    std::cerr << e.what() << std::endl;
}


运行示例:在windowsXP的cmd窗口下 
输入:upload.exe time-a.nist.gov
输出:54031 06-10-23 01:50:45 07 0 0 454.2 UTC(NIST) *
完整的代码:
#include <iostream>
#include <boost/array.hpp>
#include <asio.hpp>
using asio::ip::tcp;
int main(int argc, char* argv[])
{
    try
    {
        if (argc != 2)
        {
            std::cerr << "Usage: client <host>" << std::endl;
            return 1;
        }
        asio::io_service io_service;
        tcp::resolver resolver(io_service);
        tcp::resolver::query query(argv[1], "daytime");
        tcp::resolver::iterator endpoint_iterator = resolver.resolve(query);
        tcp::resolver::iterator end;
        tcp::socket socket(io_service);
        asio::error error = asio::error::host_not_found;
        while (error && endpoint_iterator != end)
        {
            socket.close();
            socket.connect(*endpoint_iterator++, asio::assign_error(error));
        }
        if (error)
            throw error;
        for (;;)
        {
            boost::array<char, 128> buf;
            asio::error error;
            size_t len = socket.read_some(
                    asio::buffer(buf), asio::assign_error(error));
            if (error == asio::error::eof)
                break; // Connection closed cleanly by peer.
            else if (error)
                throw error; // Some other error.
            std::cout.write(buf.data(), len);
        }
    }
    catch (std::exception& e)
    {
        std::cerr << e.what() << std::endl;
    }
    return 0;
}

8. TCP同步时间服务器
#include <ctime>
#include <iostream>
#include <string>
#include <asio.hpp>
using asio::ip::tcp;
咱们先定义一个函数返回当前的时间的string形式.这个函数会在咱们全部的时间服务器示例上被使用.
std::string make_daytime_string()
{
    using namespace std; // For time_t, time and ctime;
    time_t now = time(0);
    return ctime(&now);
}
int main()
{
    try
    {
        asio::io_service io_service;
新建一个asio::ip::tcp::acceptor对象来监听新的链接.咱们监听TCP端口13,IP版本为V4
tcp::acceptor acceptor(io_service, tcp::endpoint(tcp::v4(), 13));


这是一个iterative server,也就是说同一时间只能处理一个链接.创建一个socket来表示一个和客户端的链接, 而后等待客户端的链接.
for (;;)
{
    tcp::socket socket(io_service);
    acceptor.accept(socket);
当客户端访问服务器时,咱们获取当前时间,而后返回它.
        std::string message = make_daytime_string();
        asio::write(socket, asio::buffer(message),
            asio::transfer_all(), asio::ignore_error());
    }
}
最后处理异常
catch (std::exception& e)
    {
        std::cerr << e.what() << std::endl;
    }
    return 0;

运行示例:运行服务器,而后运行上一节的客户端,在windowsXP的cmd窗口下 
输入:client.exe 127.0.0.1 
输出:Mon Oct 23 09:44:48 2006
完整的代码:
#include <ctime>
#include <iostream>
#include <string>
#include <asio.hpp>
using asio::ip::tcp;
std::string make_daytime_string()
{
    using namespace std; // For time_t, time and ctime;
    time_t now = time(0);
    return ctime(&now);
}
int main()
{
    try
    {
        asio::io_service io_service;
        tcp::acceptor acceptor(io_service, tcp::endpoint(tcp::v4(), 13));
        for (;;)
        {
            tcp::socket socket(io_service);
            acceptor.accept(socket);
            std::string message = make_daytime_string();
            asio::write(socket, asio::buffer(message),
                    asio::transfer_all(), asio::ignore_error());
        }
    }
    catch (std::exception& e)
    {
        std::cerr << e.what() << std::endl;
    }
    return 0;
}

 

usidc5 2011-07-08 22:55


构造函数


构造函数的主要动做就是调用CreateIoCompletionPort建立了一个初始iocp。


Dispatch和post的区别


Post必定是PostQueuedCompletionStatus而且在GetQueuedCompletionStatus 以后执行。


Dispatch会首先检查当前thread是否是io_service.run/runonce/poll/poll_once线程,若是是,则直接运行。


poll和run的区别


二者代码几乎同样,都是首先检查是否有outstanding的消息,若是没有直接返回,不然调用do_one()。惟一的不一样是在调用size_t do_one(bool block, boost::system::error_code& ec)时前者block = false,后者block = true。


该参数的做用体如今:


BOOL ok = ::GetQueuedCompletionStatus(iocp_.handle, &bytes_transferred,


&completion_key, &overlapped, block ? timeout : 0);


所以能够看出,poll处理的是已经完成了的消息,也即GetQueuedCompletionStatus马上能返回的。而run则会致使等待。


poll 的做用是依次处理当前已经完成了的消息,直到全部已经完成的消息处理完成为止。若是没有已经完成了得消息,函数将退出。poll不会等待。这个函数有点相似于PeekMessage。鉴于PeekMessage不多用到,poll的使用场景我也有点疑惑。poll的一个应用场景是若是但愿handler的处理有优先级,也即,若是消息完成速度很快,同时可能完成多个消息,而消息的处理过程可能比较耗时,那么能够在完成以后的消息处理函数中不真正处理数据,而是把handler保存在队列中,而后按优先级统一处理。代码以下:


while (io_service.run_one()) { 
    // The custom invocation hook adds the handlers to the priority queue 
    // rather than executing them from within the poll_one() call. 
    while (io_service.poll_one())      ;
    pri_queue.execute_all(); }


循环执行poll_one让已经完成的消息的wrap_handler处理完毕,也即插入一个队列中,而后再统一处理之。这里的wrap_handler是一个class,在post的时候,用以下代码:


io_service.post(pri_queue.wrap(0, low_priority_handler));或者 acceptor.async_accept(server_socket, pri_queue.wrap(100, high_priority_handler));


template <typename Handler> wrapped_handler<Handler> handler_priority_queue::wrap(int priority, Handler handler) 
{    return wrapped_handler<Handler>(*this, priority, handler); }


参见boost_asio/example/invocation/prioritised_handlers.cpp


这个sample也同时表现了wrap的使用场景。


也即把handler以及参数都wrap成一个object,而后把object插入一个队列,在pri_queue.execute_all中按优先级统一处理。


run的做用是处理消息,若是有消息未完成将一直等待到全部消息完成并处理以后才退出。


reset和stop


文档中reset的解释是重置io_service以便下一次调用。


当 run,run_one,poll,poll_one是被stop掉致使退出,或者因为完成了全部任务(正常退出)致使退出时,在调用下一次 run,run_one,poll,poll_one以前,必须调用此函数。reset不能在run,run_one,poll,poll_one正在运行时调用。若是是消息处理handler(用户代码)抛出异常,则能够在处理以后直接继续调用 io.run,run_one,poll,poll_one。 例如:


boost::asio::io_service io_service;  
...  
for (;;){  
  try 
  {  
    io_service.run();  
    break; // run() exited normally  
  }  
  catch (my_exception& e)  
  {  
    // Deal with exception as appropriate.  
  }  

在抛出了异常的状况下,stopped_还没来得及被asio设置为1,因此无需调用reset。
reset函数的代码仅有一行:


void reset()  
{  
::InterlockedExchange(&stopped_, 0);  

也即,当io.stop时,会设置stopped_=1。当完成全部任务时,也会设置。


总的来讲,单线程状况下,无论io.run是如何退出的,在下一次调用io.run以前调用一次reset没有什么坏处。例如:


for(;;)  
{  
try 
{  
io.run();  
}  
catch(…)  
{  
}  
io.reset();  
}  

若是是多线程在运行io.run,则应该当心,由于reset必须是全部的run,run_one,poll,poll_one退出后才能调用。


文档中的stop的解释是中止io_service的处理循环。


此函数不是阻塞函数,也即,它仅仅只是给iocp发送一个退出消息而并非等待其真正退出。由于poll和poll_one原本就不等待(GetQueuedCompletionStatus时timeout = 0),因此此函数对poll和poll_one无心义。对于run_one来讲,若是该事件还未完成,则run_one会马上返回。若是该事件已经完成,而且还在处理中,则stop并没有特殊意义(会等待handler完成后天然退出)。对于run来讲,stop的调用会致使run中的 GetQueuedCompletionStatus马上返回。而且因为设置了stopped = 1,此前完成的消息的handlers也不会被调用。考虑一下这种状况:在io.stop以前,有1k个消息已经完成但还没有处理,io.run正在依次从 GetQueuedCompletionStatus中得到信息而且调用handlers,调用io.stop设置stopped=1将致使后许 GetQueuedCompletionStatus返回的消息直接被丢弃,直到收到退出消息并退出io.run为止。


void stop()  
{  
if (::InterlockedExchange(&stopped_, 1) == 0)  
{  
if (!::PostQueuedCompletionStatus(iocp_.handle, 0, 0, 0))  
{  
DWORD last_error = ::GetLastError();  
boost::system::system_error e(  
boost::system::error_code(last_error,  
boost::asio::error::get_system_category()),  
"pqcs");  
boost::throw_exception(e);  
}  
}  

注意除了让当前代码退出以外还有一个反作用就是设置了stopped_=1。这个反作用致使在stop以后若是不调用reset,全部run,run_one,poll,poll_one都将直接退出。


另外一个须要注意的是,stop会致使全部未完成的消息以及完成了但还没有处理得消息都直接被丢弃,不会致使handlers倍调用。


注意这两个函数都不会CloseHandle(iocp.handle_),那是析构函数干的事情。


注意此处有个细节:一次PostQueuedCompletionStatus仅致使一次 GetQueuedCompletionStatus返回,那么若是有多个thread此时都在io.run,而且block在 GetQueuedCompletionStatus时,调用io.stop将PostQueuedCompletionStatus而且致使一个 thread的GetQueuedCompletionStatus返回。那么其余的thread呢?进入io_service的do_one(由run 函数调用)代码能够看到,当GetQueuedCompletionStatus返回而且发现是退出消息时,会再发送一次 PostQueuedCompletionStatus。代码以下:


else 
{  
    // Relinquish responsibility for dispatching timers. If the io_service  
    // is not being stopped then the thread will get an opportunity to  
    // reacquire timer responsibility on the next loop iteration.  
    if (dispatching_timers)  
    {  
      ::InterlockedCompareExchange(&timer_thread_, 0, this_thread_id);  
    }  
    // The stopped_ flag is always checked to ensure that any leftover  
    // interrupts from a previous run invocation are ignored.  

    if (::InterlockedExchangeAdd(&stopped_, 0) != 0)  
    {  
      // Wake up next thread that is blocked on GetQueuedCompletionStatus.  
      if (!::PostQueuedCompletionStatus(iocp_.handle, 0, 0, 0))  
      {  
        last_error = ::GetLastError();  
        ec = boost::system::error_code(last_error,  
            boost::asio::error::get_system_category());  
        return 0;  
      }  
      ec = boost::system::error_code();  
      return 0;  
    }  
}  

Wrap


这个函数是一个语法糖。


Void func(int a);


io_service.wrap(func)(a);


至关于io_service.dispatch(bind(func,a));


能够保存io_service.wrap(func)到g,以便在稍后某些时候调用g(a);


例如:


socket_.async_read_some(boost::asio::buffer(buffer_),      strand_.wrap( 
        boost::bind(&connection::handle_read, shared_from_this(), 
          boost::asio::placeholders::error, 
          boost::asio::placeholders::bytes_transferred)));


这是一个典型的wrap用法。注意async_read_some要求的参数是一个handler,在read_some结束后被调用。因为但愿真正被调用的handle_read是串行化的,在这里再post一个消息给io_service。以上代码相似于:


void A::func(error,bytes_transferred)  
{  
strand_.dispatch(boost::bind(handle_read,shared_from_this(),error,bytes_transferred);  
}  
socket_.async_read_some(boost::asio::buffer(buffer_), func); 
注意1点:


io_service.dispatch(bind(func,a1,…an)),这里面都是传值,没法指定bind(func,ref(a1)…an)); 因此若是要用ref语义,则应该在传入wrap时显式指出。例如:


void func(int& i){i+=1;}  
void main()  
{  
int i = 0;  
boost::asio::io_service io;  
io.wrap(func)(boost::ref(i));  
io.run();  
printf("i=%d/n");  

固然在某些场合下,传递shared_ptr也是能够的(也许更好)。


从handlers抛出的异常的影响


当handlers抛出异常时,该异常会传递到本线程最外层的io.run,run_one,poll,poll_one,不会影响其余线程。捕获该异常是程序员本身的责任。


例如:





boost::asio::io_service io_service;  

Thread1,2,3,4()  
{  
for (;;)  
{  
try 
{  
io_service.run();  
break; // run() exited normally  
}  
catch (my_exception& e)  
{  
// Deal with exception as appropriate.  
}  
}  
}  

Void func(void)  
{  
throw 1;  
}  

Thread5()  
{  
io_service.post(func);  

注意这种状况下无需调用io_service.reset()。


这种状况下也不能调用reset,由于调用reset以前必须让全部其余线程正在调用的io_service.run退出。(reset调用时不能有任何run,run_one,poll,poll_one正在运行)


Work


有些应用程序但愿在没有pending的消息时,io.run也不退出。好比io.run运行于一个后台线程,该线程在程序的异步请求发出以前就启动了。


能够经过以下代码实现这种需求:


main()  
{  
boost::asio::io_service io_service;  
boost::asio::io_service::work work(io_service);  
Create thread 
Getchar();  
}  

Thread()  
{  
Io_service.run();  

这种状况下,若是work不被析构,该线程永远不会退出。在work不被析构得状况下就让其退出,能够调用io.stop。这将致使 io.run马上退出,全部未完成的消息都将丢弃。已完成的消息(但还没有进入handler的)也不会调用其handler函数(因为在stop中设置了 stopped_= 1)。


若是但愿全部发出的异步消息都正常处理以后io.run正常退出,work对象必须析构,或者显式的删除。


boost::asio::io_service io_service;  
auto_ptr<boost::asio::io_service::work> work(  
new boost::asio::io_service::work(io_service));  

...  

work.reset(); // Allow run() to normal exit. 
work是一个很小的辅助类,只支持构造函数和析构函数。(还有一个get_io_service返回所关联的io_service)


代码以下:


inline io_service::work::work(boost::asio::io_service& io_service)  
: io_service_(io_service)  
{  
io_service_.impl_.work_started();  
}  

inline io_service::work::work(const work& other)  
: io_service_(other.io_service_)  
{  
io_service_.impl_.work_started();  
}  

inline io_service::work::~work()  
{  
io_service_.impl_.work_finished();  
}  

void work_started()  
{  
::InterlockedIncrement(&outstanding_work_);  
}  

// Notify that some work has finished.  
void work_finished()  
{  
if (::InterlockedDecrement(&outstanding_work_) == 0)  
stop();  
}  
能够看出构造一个work时,outstanding_work_+1,使得io.run在完成全部异步消息后判断outstanding_work_时不会为0,所以会继续调用GetQueuedCompletionStatus并阻塞在这个函数上。


而析构函数中将其-1,并判断其是否为0,若是是,则post退出消息给GetQueuedCompletionStatus让其退出。


所以work若是析构,则io.run会在处理完全部消息以后正常退出。work若是不析构,则io.run会一直运行不退出。若是用户直接调用io.stop,则会让io.run马上退出。


特别注意的是,work提供了一个拷贝构造函数,所以能够直接在任意地方使用。对于一个io_service来讲,有多少个work实例关联,则outstanding_work_就+1了多少次,只有关联到同一个io_service的work全被析构以后,io.run才会在全部消息处理结束以后正常退出。


strand


strand是另外一个辅助类,提供2个接口dispatch和post,语义和io_service的dispatch和post相似。区别在于,同一个strand所发出的dispatch和post绝对不会并行执行,dispatch和post所包含的handlers也不会并行。所以若是但愿串行处理每个tcp链接,则在accept以后应该在该链接的数据结构中构造一个strand,而且全部dispatch/post(recv /send)操做都由该strand发出。strand的做用巨大,考虑以下场景:有多个thread都在执行async_read_some,那么因为线程调度,颇有可能后接收到的包先被处理,为了不这种状况,就只能收完数据后放入一个队列中,而后由另外一个线程去统一处理。


void connection::start()   
{   
socket_.async_read_some(boost::asio::buffer(buffer_),   
strand_.wrap(   
boost::bind(&connection::handle_read, shared_from_this(),   
boost::asio::placeholders::error,   
boost::asio::placeholders::bytes_transferred)));   

不使用strand的处理方式:


前端tcp iocp收包,而且把同一个tcp链接的包放入一个list,若是list之前为空,则post一个消息给后端vnn iocp。后端vnn iocp收到post的消息后循环从list中获取数据,而且处理,直到list为空为止。处理结束后从新调用 GetQueuedCompletionStatus进入等待。若是前端tcp iocp发现list过大,意味着处理速度小于接收速度,则再也不调用iocpRecv,而且设置标志,当vnn iocp thread处理完了当前全部积压的数据包后,检查这个标志,从新调用一次iocpRecv。


使用strand的处理方式:


前端tcp iocp收包,收到包后直接经过strand.post(on_recved)发给后端vnn iocp。后端vnn iocp处理完以后再调用一次strand.async_read_some。


这两种方式我没看出太大区别来。若是对数据包的处理的确须要阻塞操做,例如db query,那么使用后端iocp以及后端thread是值得考虑的。这种状况下,前端iocp因为仅用来异步收发数据,所以1个thread就够了。在肯定使用2级iocp的状况下,前者彷佛更为灵活,也没有增长什么开销。


值得讨论的是,若是后端多个thread都处于db query状态,那么实际上此时依然没有thread能够提供数据处理服务,所以2级iocp意义其实就在于在这种状况下,前端tcp iocp依然能够accept,以及recv第一次数据,不会致使用户connect不上的状况。在后端thread空闲以后会处理这期间的recv到的数据并在此async_read_some。


若是是单级iocp(假定handlers没有阻塞操做),多线程,那么strand的做用很明显。这种状况下,很明显应该让一个tcp链接的数据处理过程串行化。


Strand的实现原理


Strand内部实现机制稍微有点复杂。每次发出strand请求(例如 async_read(strand_.wrap(funobj1))),strand再次包裹了一次成为funobj2。在async_read完成时,系统调用funobj2,检查是否正在执行该strand所发出的完成函数(检查该strand的一个标志位),若是没有,则直接调用 funobj2。若是有,则检查是否就是当前thread在执行,若是是,则直接调用funobj2(这种状况可能发生在嵌套调用的时候,但并不产生同步问题,就像同一个thread能够屡次进入同一个critical_session同样)。若是不是,则把该funobj2插入到strand内部维护的一个队列中。

 

usidc5 2011-07-13 18:18


最近在设计一个多线程分块支持续传的http的异步客户端时, 测试部门常常发现http下载模
块退出时偶尔会卡住, 在win7系统上由为明显. 反复检查代码, 并未明显问题, 因而专门写
了一个反复退出的单元测试, 当即发现问题, 并定位在io_service的析构函数中, 奇怪的是, 
个人投递io的全部socket都早已经关闭, run线程也已经退出, 按理说, 这时的io_service的
outstanding_work_应该为0才是, 可我一看它倒是1, 因而始终在win_iocp_io_service.hpp的
shutdown_service里一直循环调用GetQueuedCompletionStatus, 从而致使没法正常退出...
很明显, 这是asio对outstanding_work_计数维护的有问题, 为了解决
问题, 因而我很快想到不使用iocp, 添加宏BOOST_ASIO_DISABLE_IOCP一切就正常了...
因为本身使用的是boost.1.45版本, 因而换了个boost.1.46.1再试试, 结果同样. 难道这么严
重的bug跨在了这两个很是重要的发行版本上而没人知道?
在官方的邮件列表中细节检查, 终于看到了某人的bug报告和我描述的状况差很少, 并且
在那我的报告了bug的次日, asio做者就发布了补丁, 但这个补丁并未更新到boost.1.45
和boost.1.46中, 唉, 这两个版本但是大版本啊, 估计受害人很多...
不过幸运的是, 我在boost的主分枝中看到了修正的代码.
下面是这个补丁内容:


From 81a6a51c0cb66de6bc77e1fa5dcd46b2794995e4 Mon Sep 17 00:00:00 2001
From: Christopher Kohlhoff <chris@kohlhoff.com>
Date: Wed, 23 Mar 2011 15:03:56 +1100
Subject: [PATCH] On Windows, ensure the count of outstanding work is decremented for
abandoned operations (i.e. operations that are being cleaned up within
the io_service destructor).


---
asio/include/asio/detail/impl/dev_poll_reactor.ipp |    2 ++
asio/include/asio/detail/impl/epoll_reactor.ipp    |    2 ++
asio/include/asio/detail/impl/kqueue_reactor.ipp   |    2 ++
asio/include/asio/detail/impl/select_reactor.ipp   |    2 ++
.../asio/detail/impl/signal_set_service.ipp        |    2 ++
asio/include/asio/detail/impl/task_io_service.ipp  |    7 +++++++
.../asio/detail/impl/win_iocp_io_service.ipp       |   11 +++++++++++
asio/include/asio/detail/task_io_service.hpp       |    4 ++++
asio/include/asio/detail/win_iocp_io_service.hpp   |    4 ++++
9 files changed, 36 insertions(+), 0 deletions(-)


diff --git a/asio/include/asio/detail/impl/dev_poll_reactor.ipp b/asio/include/asio/detail/impl/dev_poll_reactor.ipp
index 06d89ea..2a01993 100644
--- a/asio/include/asio/detail/impl/dev_poll_reactor.ipp
+++ b/asio/include/asio/detail/impl/dev_poll_reactor.ipp
@@ -63,6 +63,8 @@ void dev_poll_reactor::shutdown_service()
     op_queue_.get_all_operations(ops);

   timer_queues_.get_all_timers(ops);
+
+  io_service_.abandon_operations(ops);


// Helper class to re-register all descriptors with /dev/poll.
diff --git a/asio/include/asio/detail/impl/epoll_reactor.ipp b/asio/include/asio/detail/impl/epoll_reactor.ipp
index 22f567a..d08dedb 100644
--- a/asio/include/asio/detail/impl/epoll_reactor.ipp
+++ b/asio/include/asio/detail/impl/epoll_reactor.ipp
@@ -84,6 +84,8 @@ void epoll_reactor::shutdown_service()
   }

   timer_queues_.get_all_timers(ops);
+
+  io_service_.abandon_operations(ops);
}

void epoll_reactor::fork_service(asio::io_service::fork_event fork_ev)
diff --git a/asio/include/asio/detail/impl/kqueue_reactor.ipp b/asio/include/asio/detail/impl/kqueue_reactor.ipp
index f0cdf73..45aff60 100644
--- a/asio/include/asio/detail/impl/kqueue_reactor.ipp
+++ b/asio/include/asio/detail/impl/kqueue_reactor.ipp
@@ -74,6 +74,8 @@ void kqueue_reactor::shutdown_service()
   }

   timer_queues_.get_all_timers(ops);
+
+  io_service_.abandon_operations(ops);
}

void kqueue_reactor::fork_service(asio::io_service::fork_event fork_ev)
diff --git a/asio/include/asio/detail/impl/select_reactor.ipp b/asio/include/asio/detail/impl/select_reactor.ipp
index f4e0314..00fd9fc 100644
--- a/asio/include/asio/detail/impl/select_reactor.ipp
+++ b/asio/include/asio/detail/impl/select_reactor.ipp
@@ -81,6 +81,8 @@ void select_reactor::shutdown_service()
     op_queue_.get_all_operations(ops);

   timer_queues_.get_all_timers(ops);
+
+  io_service_.abandon_operations(ops);
}

void select_reactor::fork_service(asio::io_service::fork_event fork_ev)
diff --git a/asio/include/asio/detail/impl/signal_set_service.ipp b/asio/include/asio/detail/impl/signal_set_service.ipp
index f0f0e78..4cde184 100644
--- a/asio/include/asio/detail/impl/signal_set_service.ipp
+++ b/asio/include/asio/detail/impl/signal_set_service.ipp
@@ -145,6 +145,8 @@ void signal_set_service::shutdown_service()
       reg = reg->next_in_table_;
     }
   }
+
+  io_service_.abandon_operations(ops);
}

void signal_set_service::fork_service(
diff --git a/asio/include/asio/detail/impl/task_io_service.ipp b/asio/include/asio/detail/impl/task_io_service.ipp
index cb585d5..0a2c6fa 100644
--- a/asio/include/asio/detail/impl/task_io_service.ipp
+++ b/asio/include/asio/detail/impl/task_io_service.ipp
@@ -230,6 +230,13 @@ void task_io_service::post_deferred_completions(
   }
}

+void task_io_service::abandon_operations(
+    op_queue<task_io_service::operation>& ops)
+{
+  op_queue<task_io_service::operation> ops2;
+  ops2.push(ops);
+}
+
std::size_t task_io_service::do_one(mutex::scoped_lock& lock,
     task_io_service::idle_thread_info* this_idle_thread)
{
diff --git a/asio/include/asio/detail/impl/win_iocp_io_service.ipp b/asio/include/asio/detail/impl/win_iocp_io_service.ipp
index ca3125e..7aaa6b8 100644
--- a/asio/include/asio/detail/impl/win_iocp_io_service.ipp
+++ b/asio/include/asio/detail/impl/win_iocp_io_service.ipp
@@ -262,6 +262,17 @@ void win_iocp_io_service::post_deferred_completions(
   }
}

+void win_iocp_io_service::abandon_operations(
+    op_queue<win_iocp_operation>& ops)
+{
+  while (win_iocp_operation* op = ops.front())
+  {
+    ops.pop();
+    ::InterlockedDecrement(&outstanding_work_);
+    op->destroy();
+  }
+}
+
void win_iocp_io_service::on_pending(win_iocp_operation* op)
{
   if (::InterlockedCompareExchange(&op->ready_, 1, 0) == 1)
diff --git a/asio/include/asio/detail/task_io_service.hpp b/asio/include/asio/detail/task_io_service.hpp
index 285d83e..654f83c 100644
--- a/asio/include/asio/detail/task_io_service.hpp
+++ b/asio/include/asio/detail/task_io_service.hpp
@@ -105,6 +105,10 @@ public:
   // that work_started() was previously called for each operation.
   ASIO_DECL void post_deferred_completions(op_queue<operation>& ops);

+  // Process unfinished operations as part of a shutdown_service operation.
+  // Assumes that work_started() was previously called for the operations.
+  ASIO_DECL void abandon_operations(op_queue<operation>& ops);
+
private:
   // Structure containing information about an idle thread.
   struct idle_thread_info;
diff --git a/asio/include/asio/detail/win_iocp_io_service.hpp b/asio/include/asio/detail/win_iocp_io_service.hpp
index a562834..b5d7f0b 100644
--- a/asio/include/asio/detail/win_iocp_io_service.hpp
+++ b/asio/include/asio/detail/win_iocp_io_service.hpp
@@ -126,6 +126,10 @@ public:
   ASIO_DECL void post_deferred_completions(
       op_queue<win_iocp_operation>& ops);

+  // Enqueue unfinished operation as part of a shutdown_service operation.
+  // Assumes that work_started() was previously called for the operations.
+  ASIO_DECL void abandon_operations(op_queue<operation>& ops);
+
   // Called after starting an overlapped I/O operation that did not complete
   // immediately. The caller must have already called work_started() prior to
   // starting the operation.
-- 
1.7.0.1


注: 在boost.asio中, 使用这个补丁时, 须要将ASIO_DECL 改为 BOOST_ ASIO_DECL 


这是我第二次在使用asio的过程当中, 发现的比较严重的bug了, 不过幸运的是, 每一次都能在官方的论坛 
或邮件列表中找到解决方案. 


结论, 再牛的人写的代码也会有bug, 我的很是崇拜asio的做者. 

 

usidc5 2011-09-30 22:59
在win32平台上,asio是基于IOCP技术实现的,我之前也用过IOCP,却没想到竟然能扩展成这样,真是神奇!在其余平台下还会有别的方法去实现,具体见io_service类下面这部分的源码:
  // The type of the platform-specific implementation.
#if defined(BOOST_ASIO_HAS_IOCP)
   typedef detail::win_iocp_io_service impl_type;
   friend class detail::win_iocp_overlapped_ptr;
#elif defined(BOOST_ASIO_HAS_EPOLL)
   typedef detail::task_io_service<detail::epoll_reactor<false> > impl_type;
#elif defined(BOOST_ASIO_HAS_KQUEUE)
   typedef detail::task_io_service<detail::kqueue_reactor<false> > impl_type;
#elif defined(BOOST_ASIO_HAS_DEV_POLL)
   typedef detail::task_io_service<detail::dev_poll_reactor<false> > impl_type;
#else
   typedef detail::task_io_service<detail::select_reactor<false> > impl_type;
#endif
这部分代码其实就在boost::asio::io_service类声明中的最前面几行,能够看见在不一样平台下,io_service类的实现将会不一样。很显然,windows平台下固然是win_iocp_io_service类为实现了(不过我一开始还觉得win_iocp_io_service是直接拿出来用的呢,还在疑惑这样怎么有移植性呢?官方文档也对该类只字不提,其实我卡壳就是卡在这里了,差点就直接用这个类了^_^!)。

那么就分析一下win_iocp_io_service的代码吧,这里彻底是用IOCP来路由各类任务,你们使用post来委托任务,内部调用的实际上是IOCP的PostQueuedCompletionStatus函数,而后线程们用run来接受任务,内部实际上是阻塞在IOCP的GetQueuedCompletionStatus函数上,一旦有了任务就当即返回,执行完后再一个循环,继续阻塞在这里等待下一个任务的到来,这种设计思想堪称神奇,对线程、服务以及任务彻底解耦,灵活度达到了如此高度,不愧为boost库的东西!我只能有拜的份了...

说一下整体的设计思想,其实io_service就像是劳工中介所,而一个线程就是一个劳工,而调用post的模块至关于富人们,他们去中介所委托任务,而劳工们就听候中介所的调遣去执行这些任务,任务的内容就写在富人们给你的handler上,也就是函数指针,指针指向具体实现就是任务的实质内容。其实在整个过程当中,富人们都不知道是哪一个劳工帮他们作的工做,只知道是中介所负责完成这些就能够了。这使得逻辑上的耦合降到了最低。不过这样的比喻也有个不恰当的地方,若是硬要这样比喻的话,我只能说:其实劳工里面也有不少富人的^o^! 。不少劳工在完成任务的过程当中本身也托给中介所一些任务,而后这些任务极可能仍是本身去完成。这也难怪,运行代码的老是这些线程,那么调用post的确定也会有这些线程了,不过无论怎么说,如此循环往复能够解决问题就行,比喻不见得就得恰当,任何事物之间都不可能彻底相同,只要能阐述思想就行。

最后还要说明的一点就是:委托的任务其实能够设定执行的时间的,很不错的设定,内部实现则是经过定时器原理,GetQueuedCompletionStatus有一个等待时间的参数彷佛被用在这方面,还有源码中的定时器线程我并无过多的去理解,总之大致原理已基本掌握,剩下的就是使劲的用它了!!!

另外为了方便人交流,在这里插入一些代码可能更容易让人理解吧,
下面这个是启动服务时的代码:
void ServerFramework::run()
{
     boost::thread_group workers;
    for (uint32 i = 0; i < mWorkerCount; ++i)
         workers.create_thread(
             boost::bind(&boost::asio::io_service::run, &mIoService));
     workers.join_all();
}

在打开前就得分配好任务,不然线程们运行起来就退出了,阻塞不住,任务的分配就交给open函数了,它是分配了监听端口的任务,一旦有了链接就会抛出一个任务,其中一个线程就会开始行动啦。
void ServerFramework::open(const String& address, const String& port, uint32 nWorkers /*= DEFAULT_WORKER_COUNT*/)
{
    // Open the acceptor with the option to reuse the address (i.e. SO_REUSEADDR).
     boost::asio::ip::tcp::resolver resolver(mIoService);
     boost::asio::ip::tcp::resolver::query query(address, port);
     boost::asio::ip::tcp::endpoint endpoint = *resolver.resolve(query);

     mAcceptor.open(endpoint.protocol());
     mAcceptor.set_option(boost::asio::ip::tcp::acceptor::reuse_address(true));
     mAcceptor.bind(endpoint);
     mAcceptor.listen();

     mNextConnection = new Connection(this);
     mAcceptor.async_accept(mNextConnection->getSocket(),
         boost::bind(&ServerFramework::__onConnect, this,
         boost::asio::placeholders::error));

     mWorkerCount = nWorkers;
    if (mWorkerCount == DEFAULT_WORKER_COUNT)
    {
         mWorkerCount = 4;
     }
}

open函数中给io_service的一个任务就是在有连接访问服务器端口的状况下执行ServerFramework::__onConnect函数,有一点须要格外注意的,io_service必须时刻都有任务存在,不然线程io_service::run函数将返回,因而线程都会结束并销毁,程序将退出,因此,你必须保证不管什么时候都有任务存在,这样线程们即便空闲了也仍是会继续等待,不会销毁。因此,我在ServerFramework::__onConnect函数中又一次给了io_service相同的任务,即:继续监听端口,有连接了仍是调用ServerFramework::__onConnect函数。若是你在ServerFramework::__onConnect执行完了尚未给io_service任务的话,那么一切都晚了...... 代码以下:
void ServerFramework::__onConnect(const BoostSysErr& e)
{
    if (e)
    {
         MOELOG_DETAIL_WARN(e.message().c_str());
     }

     Connection* p = mNextConnection;
     mNextConnection = new Connection(this);

    // 再次进入监听状态
     mAcceptor.async_accept(mNextConnection->getSocket(),
         boost::bind(&ServerFramework::__onConnect, this,
         boost::asio::placeholders::error));

    // 处理当前连接
     __addConnection(p);
     p->start();
}


最后,展现一下这个类的全部成员变量吧:react

 

    // 用于线程池异步处理的核心对象
     boost::asio::io_service mIoService;

    // 网络连接的接收器,用于接收请求进入的连接
     boost::asio::ip::tcp::acceptor mAcceptor;

    // 指向下一个将要被使用的连接对象
     Connection* mNextConnection;

    // 存储服务器连接对象的容器
     ConnectionSet mConnections;

    //// 为连接对象容器准备的strand,防止并行调用mConnections
    //boost::asio::io_service::strand mStrand_mConnections;

    // 为连接对象容器准备的同步锁,防止并行调用mConnections
     boost::mutex mMutex4ConnSet;

    // 为控制台输出流准备的strand,防止并行调用std::cout
     AsioService::strand mStrand_ConsoleIostream;

    // 工做线程的数量
     uint32 mWorkerCount;

 

usidc5 2013-10-07 16:41
花了足足3天时间,外加1天心情休整,终于在第5天编写出了一个能运行的基于asio和thread_group的框架,差点没气晕过去,把源码都看懂了才感受会用了。
测试了一下,debug下一万次回应耗时800+毫秒,release下是200+毫秒,机器配置双核2.5G英特尔,4个线程并行工做,无错的感受真好,不再用担忧iocp出一些奇怪的问题啦,由于是巨人们写的实现,呵呵。


进入正题,简要说一下asio的实现原理吧。在win32平台上,asio是基于IOCP技术实现的,我之前也用过IOCP,却没想到竟然能扩展成这样,真是神奇!在其余平台下还会有别的方法去实现,具体见io_service类下面这部分的源码:
  // The type of the platform-specific implementation.
#if defined(BOOST_ASIO_HAS_IOCP)
  typedef detail::win_iocp_io_service impl_type;
  friend class detail::win_iocp_overlapped_ptr;
#elif defined(BOOST_ASIO_HAS_EPOLL)
  typedef detail::task_io_service<detail::epoll_reactor<false> > impl_type;
#elif defined(BOOST_ASIO_HAS_KQUEUE)
  typedef detail::task_io_service<detail::kqueue_reactor<false> > impl_type;
#elif defined(BOOST_ASIO_HAS_DEV_POLL)
  typedef detail::task_io_service<detail::dev_poll_reactor<false> > impl_type;
#else
  typedef detail::task_io_service<detail::select_reactor<false> > impl_type;
#endif


这部分代码其实就在boost::asio::io_service类声明中的最前面几行,能够看见在不一样平台下,io_service类的实现将会不一样。很显然,windows平台下固然是win_iocp_io_service类为实现了(不过我一开始还觉得win_iocp_io_service是直接拿出来用的呢,还在疑惑这样怎么有移植性呢?官方文档也对该类只字不提,其实我卡壳就是卡在这里了,差点就直接用这个类了^_^!)。


那么就分析一下win_iocp_io_service的代码吧,这里彻底是用IOCP来路由各类任务,你们使用post来委托任务,内部调用的实际上是IOCP的PostQueuedCompletionStatus函数,而后线程们用run来接受任务,内部实际上是阻塞在IOCP的GetQueuedCompletionStatus函数上,一旦有了任务就当即返回,执行完后再一个循环,继续阻塞在这里等待下一个任务的到来,这种设计思想堪称神奇,对线程、服务以及任务彻底解耦,灵活度达到了如此高度,不愧为boost库的东西!我只能有拜的份了...


说一下整体的设计思想,其实io_service就像是劳工中介所,而一个线程就是一个劳工,而调用post的模块至关于富人们,他们去中介所委托任务,而劳工们就听候中介所的调遣去执行这些任务,任务的内容就写在富人们给你的handler上,也就是函数指针,指针指向具体实现就是任务的实质内容。其实在整个过程当中,富人们都不知道是哪一个劳工帮他们作的工做,只知道是中介所负责完成这些就能够了。这使得逻辑上的耦合降到了最低。不过这样的比喻也有个不恰当的地方,若是硬要这样比喻的话,我只能说:其实劳工里面也有不少富人的^o^! 。不少劳工在完成任务的过程当中本身也托给中介所一些任务,而后这些任务极可能仍是本身去完成。这也难怪,运行代码的老是这些线程,那么调用post的确定也会有这些线程了,不过无论怎么说,如此循环往复能够解决问题就行,比喻不见得就得恰当,任何事物之间都不可能彻底相同,只要能阐述思想就行。


最后还要说明的一点就是:委托的任务其实能够设定执行的时间的,很不错的设定,内部实现则是经过定时器原理,GetQueuedCompletionStatus有一个等待时间的参数彷佛被用在这方面,还有源码中的定时器线程我并无过多的去理解,总之大致原理已基本掌握,剩下的就是使劲的用它了!!!


另外为了方便人交流,在这里插入一些代码可能更容易让人理解吧,
下面这个是启动服务时的代码:
void ServerFramework::run()
{
    boost::thread_group workers;
    for (uint32 i = 0; i < mWorkerCount; ++i)
        workers.create_thread(
            boost::bind(&boost::asio::io_service::run, &mIoService));
    workers.join_all();
}


在打开前就得分配好任务,不然线程们运行起来就退出了,阻塞不住,任务的分配就交给open函数了,它是分配了监听端口的任务,一旦有了链接就会抛出一个任务,其中一个线程就会开始行动啦。
void ServerFramework::open(const String& address, const String& port, uint32 nWorkers /*= DEFAULT_WORKER_COUNT*/)
{
    // Open the acceptor with the option to reuse the address (i.e. SO_REUSEADDR).
    boost::asio::ip::tcp::resolver resolver(mIoService);
    boost::asio::ip::tcp::resolver::query query(address, port);
    boost::asio::ip::tcp::endpoint endpoint = *resolver.resolve(query);


    mAcceptor.open(endpoint.protocol());
    mAcceptor.set_option(boost::asio::ip::tcp::acceptor::reuse_address(true));
    mAcceptor.bind(endpoint);
    mAcceptor.listen();


    mNextConnection = new Connection(this);
    mAcceptor.async_accept(mNextConnection->getSocket(),
        boost::bind(&ServerFramework::__onConnect, this,
        boost::asio::placeholders::error));


    mWorkerCount = nWorkers;
    if (mWorkerCount == DEFAULT_WORKER_COUNT)
    {
        mWorkerCount = 4;
    }
}


open函数中给io_service的一个任务就是在有连接访问服务器端口的状况下执行ServerFramework::__onConnect函数,有一点须要格外注意的,io_service必须时刻都有任务存在,不然线程io_service::run函数将返回,因而线程都会结束并销毁,程序将退出,因此,你必须保证不管什么时候都有任务存在,这样线程们即便空闲了也仍是会继续等待,不会销毁。因此,我在ServerFramework::__onConnect函数中又一次给了io_service相同的任务,即:继续监听端口,有连接了仍是调用ServerFramework::__onConnect函数。若是你在ServerFramework::__onConnect执行完了尚未给io_service任务的话,那么一切都晚了...... 代码以下:
void ServerFramework::__onConnect(const BoostSysErr& e)
{
    if (e)
    {
        MOELOG_DETAIL_WARN(e.message().c_str());
    }


    Connection* p = mNextConnection;
    mNextConnection = new Connection(this);


    // 再次进入监听状态
    mAcceptor.async_accept(mNextConnection->getSocket(),
        boost::bind(&ServerFramework::__onConnect, this,
        boost::asio::placeholders::error));


    // 处理当前连接
    __addConnection(p);
    p->start();
}


最后,展现一下这个类的全部成员变量吧:
    // 用于线程池异步处理的核心对象
    boost::asio::io_service mIoService;


    // 网络连接的接收器,用于接收请求进入的连接
    boost::asio::ip::tcp::acceptor mAcceptor;


    // 指向下一个将要被使用的连接对象
    Connection* mNextConnection;


    // 存储服务器连接对象的容器
    ConnectionSet mConnections;


    //// 为连接对象容器准备的strand,防止并行调用mConnections
    //boost::asio::io_service::strand mStrand_mConnections;


    // 为连接对象容器准备的同步锁,防止并行调用mConnections
    boost::mutex mMutex4ConnSet;


    // 为控制台输出流准备的strand,防止并行调用std::cout
    AsioService::strand mStrand_ConsoleIostream;


    // 工做线程的数量
    uint32 mWorkerCount;

 

usidc5 2013-10-07 16:41
boost的官方例子,有单线程的网络框架,httpserver2是线程池的。下面参照网上某人的代码修改了一点(忘了哪位大仙的代码了)
测试工具,适用stressmark,测试效果很是好, 9000个/s
复制代码
#include <stdio.h>
#include "AuthenHandle.h"
#include "configure.h"
#ifdef WIN32 //for windows nt/2000/xp


#include <winsock.h>
#include <windows.h>
#include "gelsserver.h"
#pragma comment(lib,"Ws2_32.lib")
#else         //for unix




#include <sys/socket.h>
//    #include <sys/types.h>


//    #include <sys/signal.h>


//    #include <sys/time.h>


#include <netinet/in.h>     //socket


//    #include <netdb.h>


#include <unistd.h>            //gethostname


// #include <fcntl.h>


#include <arpa/inet.h>


#include <string.h>            //memset


typedef int SOCKET;
typedef struct sockaddr_in SOCKADDR_IN;
typedef struct sockaddr SOCKADDR;
#ifdef M_I386
typedef int socklen_t;
#endif


#define BOOL             int
#define INVALID_SOCKET    -1
#define SOCKET_ERROR     -1
#define TRUE             1
#define FALSE             0
#endif        //end #ifdef WIN32








static int count111 = 0;
static time_t oldtime = 0, nowtime = 0;




#include <cstdlib>
#include <iostream>
#include <stdexcept>
#include <boost/bind.hpp>
#include <boost/asio.hpp>
#include <boost/lexical_cast.hpp>
#include <boost/thread.hpp>


using namespace std;
using boost::asio::ip::tcp;


class io_service_pool
    : public boost::noncopyable
{
public:


    explicit io_service_pool(std::size_t pool_size)
        : next_io_service_(0)
    { 
        for (std::size_t i = 0; i < pool_size; ++ i)
        {
            io_service_sptr io_service(new boost::asio::io_service);
            work_sptr work(new boost::asio::io_service::work(*io_service));
            io_services_.push_back(io_service);
            work_.push_back(work);
        }
    }


    void start()
    { 
        for (std::size_t i = 0; i < io_services_.size(); ++ i)
        {
            boost::shared_ptr<boost::thread> thread(new boost::thread(
                boost::bind(&boost::asio::io_service::run, io_services_)));
            threads_.push_back(thread);
        }
    }


    void join()
    {
        for (std::size_t i = 0; i < threads_.size(); ++ i)
        {
            threads_->join();
        } 
    }


    void stop()
    { 
        for (std::size_t i = 0; i < io_services_.size(); ++ i)
        {
            io_services_->stop();
        }
    }


    boost::asio::io_service& get_io_service()
    {
        boost::mutex::scoped_lock lock(mtx);
        boost::asio::io_service& io_service = *io_services_[next_io_service_];
        ++ next_io_service_;
        if (next_io_service_ == io_services_.size())
        {
            next_io_service_ = 0;
        }
        return io_service;
    }


private:
    typedef boost::shared_ptr<boost::asio::io_service> io_service_sptr;
    typedef boost::shared_ptr<boost::asio::io_service::work> work_sptr;
    typedef boost::shared_ptr<boost::thread> thread_sptr;


    boost::mutex mtx;


    std::vector<io_service_sptr> io_services_;
    std::vector<work_sptr> work_;
    std::vector<thread_sptr> threads_; 
    std::size_t next_io_service_;
};


boost::mutex cout_mtx;
int packet_size = 0;
enum {MAX_PACKET_LEN = 4096};


class session
{
public:
    session(boost::asio::io_service& io_service)
        : socket_(io_service)
        , recv_times(0)
    {
    }


    virtual ~session()
    {
        boost::mutex::scoped_lock lock(cout_mtx);
    }


    tcp::socket& socket()
    {
        return socket_;
    }


    inline void start()
    {


        socket_.async_read_some(boost::asio::buffer(data_, MAX_PACKET_LEN),
            boost::bind(&session::handle_read, this,
            boost::asio::placeholders::error,
            boost::asio::placeholders::bytes_transferred));
    }


    void handle_read(const boost::system::error_code& error, size_t bytes_transferred)
    {
        if (!error)
        {
            ++ recv_times;




            count111 ++;


            struct tm *today;
            time_t ltime;
            time( &nowtime );


            if(nowtime != oldtime){
                printf("%d\n", count111);
                oldtime = nowtime;
                count111 = 0;
            }




            boost::asio::async_write(socket_, boost::asio::buffer(data_, bytes_transferred),
                boost::bind(&session::handle_write, this, boost::asio::placeholders::error));






        }
        else
        {
            delete this;
        }
    }


    void handle_write(const boost::system::error_code& error)
    {
        if (!error)
        {
            start();
        }
        else
        {
            delete this;
        }
    }


private:
    tcp::socket socket_;
    char data_[MAX_PACKET_LEN];
    int recv_times;
};


class server
{
public:
    server(short port, int thread_cnt)
        : io_service_pool_(thread_cnt)
        , acceptor_(io_service_pool_.get_io_service(), tcp::endpoint(tcp::v4(), port))
    {
        session* new_session = new session(io_service_pool_.get_io_service());
        acceptor_.async_accept(new_session->socket(),
            boost::bind(&server::handle_accept, this, new_session, boost::asio::placeholders::error));
    }


    void handle_accept(session* new_session, const boost::system::error_code& error)
    {
        if (!error)
        {
            new_session->start();
        }
        else
        {
            delete new_session;
        }


        new_session = new session(io_service_pool_.get_io_service());
        acceptor_.async_accept(new_session->socket(),
            boost::bind(&server::handle_accept, this, new_session, boost::asio::placeholders::error));
    }


    void run()
    {
        io_service_pool_.start();
        io_service_pool_.join();
    }


private:


    io_service_pool io_service_pool_;
    tcp::acceptor acceptor_;
};






int main()
{


    //boost


    server s(port, 50);
    s.run();


    while(true)
    {
        sleep(1000);




     }


    return 0;
}
复制代码


 

usidc5 2013-10-07 16:42
网上大部分人都讲boost.asio用完成端口实现,而且实现了线程池,因此效率很是的高。
      我在应用asio的时候发现完成端口是有,可是线程池确并不存在,并且在现有的架构下,要想用线程池来实现对数据的处理,可能写出来不是很好看。
asio经过开启线程调用io_service::run再调用win_iocp_io_service::run来处理收到的事件。
  size_t run(boost::system::error_code& ec)
  {
    if (::InterlockedExchangeAdd(&outstanding_work_, 0) == 0)
    {
      ec = boost::system::error_code();
      return 0;
    }

    call_stack<win_iocp_io_service>::context ctx(this);

    size_t n = 0;
    while (do_one(true, ec))
      if (n != (std::numeric_limits<size_t>::max)())
        ++n;
    return n;
  }
do_one里面为
BOOL ok = ::GetQueuedCompletionStatus(iocp_.handle, &bytes_transferred,
          &completion_key, &overlapped, block ? timeout : 0);
        operation* op = static_cast<operation*>(overlapped);
        op->do_completion(last_error, bytes_transferred);
实际上若是op->do_completion里面有时间比较长的操做,这个线程一样为死在这个地方。
由于只有一个线程在驱动前面的run函数。
固然你也能够经过同时启动几个线程来调用run函数,这样是可行的,可是这种手法确很笨拙,由于你可能一下启动10个线程,却只有一个线程比较忙,
或者你的10个线程根本就忙不过来,这根有没有使用iocp彻底没什么两样。
     作事情要力求完美,不要觉得NB的大师不提供的东西,你就不能自已弄一个。其实我以为asio里面c++的运用,很是的完美,可是从实用的角度来讲,
还不如我之前一个同事写的iocp写得好。

咱们怎么对asio这部分进行改良,让他支持线程池的方式呢。
实际上咱们只须要对win_iocp_io_service进行一些加工便可。
在do_one里面
op->do_completion(last_error, bytes_transferred);
以前auto_work work(*this);
这个地方,实际上就是来计算当前有多少工做要作,
这个地方调用work_started
  ::InterlockedIncrement(&outstanding_work_);
只须要在这按照你的需求加入一个线程就能够了。
算法自已想吧,还存在work_finished函数,能够用来减小线程。
须要给win_iocp_io_service类增长一个thread_group成员变量,供上面使用。
改良的方式不是很好,也比较很差看,
唉,完美只存在内心,适可而止吧。

 

usidc5 2013-10-07 16:43
正如其名字,asio是一个异步网络库。但第一次使用它倒是把它做为一个线程池的实现。下面是一段实验代码。


#include <iostream>
#include <boost/asio.hpp>
#include <boost/bind.hpp>
#include <boost/thread/thread.hpp>
void foo() {
  sleep(1);
  printf("foo: %d\n", (int)pthread_self());
}




class TaskPool {
  typedef boost::shared_ptr<boost::thread> Thread;
  public:
  TaskPool(std::size_t num_workers) : num_workers_(num_workers) {
  }
  void Start() {
    manage_thread_.reset(new boost::thread(boost::bind(TaskPool::_Start, this)));
  }
  void Post() {
    ios_.post(foo);
  }
  private:
    static void _Start(TaskPool* pool) {
    for (std::size_t i = 0; i < pool->num_workers_; ++i) {
      pool->workers_.push_back(Thread(new boost::thread(boost::bind(&boost::asio::io_service::run, &pool->ios_))));
    }
    for (std::size_t i = 0; i < pool->workers_.size(); ++i) {
      pool->workers_->join();
    }


  }
  private:
  std::size_t num_workers_;
  boost::asio::io_service ios_;
  std::vector<Thread> workers_;
  Thread manage_thread_;
};


int main()
{
  TaskPool pool(8);
  pool.Start();
  for (int i = 0; i < 1000; ++i) {
  pool.Post();
  }
  printf("post finished\n");
  sleep(10);
  return 0;
}

 

usidc5 2013-11-16 22:52
ACE是一个很成熟的中间件产品,为自适应通信环境,但它过于宏大,一堆的设计模式,架构是一层又一层,对初学者来讲,有点困难。
ASIO是基本Boost开发的异步IO库,封装了Socket,简化基于socket程序的开发。


最近分析ASIO的源代码,让我无不惊呀于它设计。在ACE中开发中的内存管理一直让人头痛,ASIO的出现,让我看到新的曙光,成为我新的好伙伴。简单地与ACE作个比较。


1.层次架构:
ACE底层是C风格的OS适配层,上一层基于C++的wrap类,再上一层是一些框架(Accpetor, Connector,Reactor等),最上一层是框架上服务。
ASIO与之相似,底层是OS的适配层,上一层一些模板类,再上一层模板类的参数化(TCP/UDP),再上一层是服务,它只有一种框架为io_service。


2.涉及范围:
ACE包含了日志,IPC,线程,共享内存,配置服务等。
ASIO只涉及到Socket,提供简单的线程操做。


3.设计模式:
ACE主要应用了Reactor,Proactor等。
而ASIO主要应用了Proactor。


4.线程调度:
ACE的Reactor是单线程调度,Proactor支持多线程调度。
ASIO支持单线程与多线程调度。


5.事件分派处理:
ACE主要是注册handler类,当事件分派时,调用其handler的虚挂勾函数。实现ACE_Handler/ACE_Svc_Handler/ACE_Event_handler等类的虚函数。
ASIO是基于函数对象的hanlder事件分派。任何函数均可能成功hanlder,少了一堆虚表的维护,调度上优于ACE。


6.发布方式:
ACE是开源免费的,不依赖于第3方库, 通常应用使用它时,以动态连接的方式发布动态库。
ASIO是开源免费的,依赖Boost,应用使用时只要include头文件,不需动态库。


7.可移植性:
ACE支持多种平台,可移植性不存在问题,听说socket编程在linux下有很多bugs。
ASIO支持多种平台,可移植性不存在问题。


8.开发难度:
基于ACE开发应用,对程序员要求比较高,要用好它,必须很是了解其框架。在其框架下开发,每每new出一个对象,不知在什么地方释放好。
基于ASIO开发应用,要求程序员熟悉函数对象,函数指针,熟悉boost库中的boost::bind。内存管理控制方便。




我我的以为,若是应用socket编程,使用ASIO开发比较好,开发效率比较高。ACE适合于理论研究,它原本就是源于Douglas的学术研究。

 

usidc5 2013-11-16 22:53
在使用IOCP时,最重要的几个API就是GetQueueCompeltionStatus、WSARecv、WSASend,数据的I/O及其完成状态经过这几个接口获取并进行后续处理。


GetQueueCompeltionStatus attempts to dequeue an I/O completion packet from the specified I/O completion port. If there is no completion packet queued, the function waits for a pending I/O operation associated with the completion port to complete.


BOOL WINAPI GetQueuedCompletionStatus(
  __in   HANDLE CompletionPort,
  __out  LPDWORD lpNumberOfBytes,
  __out  PULONG_PTR lpCompletionKey,
  __out  LPOVERLAPPED *lpOverlapped,
  __in   DWORD dwMilliseconds
);
If the function dequeues a completion packet for a successful I/O operation from the completion port, the return value is nonzero. The function stores information in the variables pointed to by the lpNumberOfBytes, lpCompletionKey, and lpOverlapped parameters.


除了关心这个API的in & out(这是MSDN开头的几行就能够告诉咱们的)以外,咱们更加关心不一样的return & out意味着什么,由于因为各类已知或未知的缘由,咱们的程序并不老是有正确的return & out。


If *lpOverlapped is NULL and the function does not dequeue a completion packet from the completion port, the return value is zero. The function does not store information in the variables pointed to by the lpNumberOfBytes and lpCompletionKey parameters. To get extended error information, call GetLastError. If the function did not dequeue a completion packet because the wait timed out, GetLastError returns WAIT_TIMEOUT.


假设咱们指定dwMilliseconds为INFINITE。


这里常见的几个错误有:


WSA_OPERATION_ABORTED (995): Overlapped operation aborted.


因为线程退出或应用程序请求,已放弃I/O 操做。


MSDN: An overlapped operation was canceled due to the closure of the socket, or the execution of the SIO_FLUSH command in WSAIoctl. Note that this error is returned by the operating system, so the error number may change in future releases of Windows.


成因分析:这个错误通常是因为peer socket被closesocket或者WSACleanup关闭后,针对这些socket的pending overlapped I/O operation被停止。


解决方案:针对socket,通常应该先调用shutdown禁止I/O操做后再调用closesocket关闭。


严重程度:轻微易处理。


WSAENOTSOCK (10038): Socket operation on nonsocket.


MSDN: An operation was attempted on something that is not a socket. Either the socket handle parameter did not reference a valid socket, or for select, a member of an fd_set was not valid.


成因分析:在一个非套接字上尝试了一个操做。


使用closesocket关闭socket以后,针对该invalid socket的任何操做都会得到该错误。


解决方案:若是是多线程存在对同一socket的操做,要保证对socket的I/O操做逻辑上的顺序,作好socket的graceful disconnect。


严重程度:轻微易处理。


WSAECONNRESET (10054): Connection reset by peer.


远程主机强迫关闭了一个现有的链接。


MSDN: An existing connection was forcibly closed by the remote host. This normally results if the peer application on the remote host is suddenly stopped, the host is rebooted, the host or remote network interface is disabled, or the remote host uses a hard close (see setsockopt for more information on the SO_LINGER option on the remote socket). This error may also result if a connection was broken due to keep-alive activity detecting a failure while one or more operations are in progress. Operations that were in progress fail with WSAENETRESET. Subsequent operations fail with WSAECONNRESET.


成因分析:在使用WSAAccpet、WSARecv、WSASend等接口时,若是peer application忽然停止(缘由如上所述),往其对应的socket上投递的operations将会失败。


解决方案:若是是对方主机或程序意外停止,那就只有各安天命了。但若是这程序是你写的,而你只是hard close,那就由不得别人了。至少,你要知道这样的错误已经出现了,就不要再费劲的继续投递或等待了。


严重程度:轻微易处理。


WSAECONNREFUSED (10061): Connection refused.


因为目标机器积极拒绝,没法链接。


MSDN: No connection could be made because the target computer actively refused it. This usually results from trying to connect to a service that is inactive on the foreign host—that is, one with no server application running.


成因分析:在使用connect或WSAConnect时,服务器没有运行或者服务器的监听队列已满;在使用WSAAccept时,客户端的链接请求被condition function拒绝。


解决方案:Call connect or WSAConnect again for the same socket. 等待服务器开启、监听空闲或查看被拒绝的缘由。是否是长的丑或者钱没给够,要不就是服务器拒绝接受天价薪酬自主创业去了?


严重程度:轻微易处理。


WSAENOBUFS (10055): No buffer space available.


因为系统缓冲区空间不足或列队已满,不能执行套接字上的操做。


MSDN: An operation on a socket could not be performed because the system lacked sufficient buffer space or because a queue was full.


成因分析:这个错误是我查看错误日志后,最在乎的一个错误。由于服务器对于消息收发有明确限制,若是缓冲区不足应该早就处理了,不可能待到send/recv失败啊。并且这个错误在以前的版本中几乎没有出现过。这也是这篇文章的主要内容。像connect和accept由于缓冲区空间不足均可以理解,并且危险不高,但若是send/recv形成拥堵并恶性循环下去,麻烦就大了,至少说明以前的验证逻辑有疏漏。


WSASend失败的缘由是:The Windows Sockets provider reports a buffer deadlock. 这里提到的是buffer deadlock,显然是因为多线程I/O投递不当引发的。


解决方案:在消息收发前,对最大挂起的消息总的数量和容量进行检验和控制。


严重程度:严重。


本文主要参考MSDN。

 

usidc5 2013-11-16 22:59
1:在IOCP中投递WSASend返回WSA_IO_PENDING的时候,表示异步投递已经成功,可是稍后发送才会完成。这其中涉及到了三个缓冲区。
网卡缓冲区,TCP/IP层缓冲区,程序缓冲区。
状况一:调用WSASend发送正确的时候(即当即返回,且没有错误),TCP/IP将数据从程序缓冲区中拷贝到TCP/IP层缓冲区中,而后不锁定该程序缓冲区,由上层程序本身处理。TCP/IP层缓冲区在网络合适的时候,将其数据拷贝到网卡缓冲区,进行真正的发送。
状况二:调用WSASend发送错误,可是错误码是WSA_IO_PENDING的时候,表示此时TCP/IP层缓冲区已满,暂时没有剩余的空间将程序缓冲区的数据拷贝出来,这时系统将锁定用户的程序缓冲区,按照书上说的WSASend指定的缓冲区将会被锁定到系统的非分页内存中。直到TCP/IP层缓冲区有空余的地方来接受拷贝咱们的程序缓冲区数据才拷贝走,并将给IOCP一个完成消息。
状况三:调用WSASend发送错误,可是错误码不是WSA_IO_PENDING,此时应该是发送错误,应该释放该SOCKET对应的全部资源。


2:在IOCP中投递WSARecv的时候,状况类似。
状况一:调用WSARecv正确,TCP/IP将数据从TCP/IP层缓冲区拷贝到缓冲区,而后由咱们的程序自行处理了。清除TCP/IP层缓冲区数据。
状况二:调用WSARecv错误,可是返回值是WSA_IO_PENDING,此时是由于TCP/IP层缓冲区中没有数据可取,系统将会锁定咱们投递的WSARecv的buffer,直到TCP/IP层缓冲区中有新的数据到来。
状况三:调用WSARecv错误,错误值不是WSA_IO_PENDING,此时是接收出错,应该释放该SOCKET对应的全部资源。


在以上状况中有几个很是要注意的事情:
系统锁定非分页内存的时候,最小的锁定大小是4K(固然,这个取决于您系统的设置,也能够设置小一些,在注册表里面能够改,固然我想这些数值微软应该比咱们更知道什么合适了),因此当咱们投递了不少WSARecv或者WSASend的时候,无论咱们投递的Buffer有多大(0除外),系统在出现IO_PENGDING的时候,都会锁定咱们4K的内存。这也就是常常有开发者出现WSANOBUF的状况缘由了。


咱们在解决这个问题的时候,要针对WSASend和WSARecv作处理
1:投递WSARecv的时候,能够采用一个巧妙的设计,先投递0大小Buf的WSARecv,若是返回,表示有数据能够接收,咱们开启真正的recv将数据从TCP/IP层缓冲区取出来,直到WSA_IO_PENGDING.
2:对投递的WSARecv以及WSASend进行计数统计,若是超过了咱们预约义的值,就不进行WSASend或者WSARecv投递了。
3:如今咱们应该就能够明白为何WSASend会返回小于咱们投递的buffer空间数据值了,是由于TCP/IP层缓冲区小于咱们要发送的缓冲区,TCP/IP只会拷贝他剩余可被Copy的缓冲区大小的数据走,而后给咱们的WSASend的已发送缓冲区设置为移走的大小,下一次投递的时候,若是TCP/IP层还未被发送,将返回WSA_IO_PENGDING。
4:在不少地方有提到,能够关闭TCP/IP层缓冲区,能够提升一些效率和性能,这个从上面的分析来看,有这个可能,要实际的网络状况去实际分析了。







==================


关于数据包在应用层乱序问题就很少说了(IOCP荒废了TCP在传输层辛辛苦苦保证的有序)。


这可有可无,由于iocp要管理上千个SOCKET,每一个SOCKET的读请求、写请求分别保证串行便可。





=============


关于GetQueuedCompletionStatus的返回值判断:


我给超时值传的是0,直接测试,无须等待。


这里咱们关心这几个值:


第二个参数所传回的byte值


第三个参数所传回的complete key值 ——PER HANDLE DATA


第四个参数所传回的OVERLAPPED结构指针 ——PER IO DATA


系统设置的ERROR值。





在超时状况下,byte值返回0,per handle data值是-1,per io data为NULL





1.若是返回FALSE


    one : iocp句柄在外部被关闭。


   WSAGetLastError返回6(无效句柄),byte值返回0,per handle data值是-1,per io data为NULL





    two: 咱们主动close一个socket句柄,或者CancelIO(socket)(且此时有未决的操做)


    WSAGetLastError返回995(因为线程退出或应用程序请求,已放弃 I/O 操做)


   byte值为0,


   per handle data与per io data正确传回。





   three:对端强退(且此时本地有未决的操做)


   WSAGetLastError返回64(指定的网络名再也不可用)


  byte值为0,per handle data与per io data正确传回 





2.若是返回TRUE【此时必定获得了你投递的OVERLAP结构】


    one:  我接收到对端数据,而后准备再投递接收请求;但此期间,对端关闭socket。


   WSARecv返回错误码10054:远程主机强迫关闭了一个现有的链接。


TODO TODO


   从网上搜到一个作法,感受很不错:


若是返回FALSE, 那么:若是OVERLAP为空,那必定是发生了错误(注意:请排除TIMEOUT错误);


若是OVERLAP不为空,有可能发生错误。不用管它,这里直接投递请求;若是有错,WSARecv将返回错误。关闭链接便可。








============


关于closesocket操做:





The closesocket function will initiate cancellation on the outstanding I/O operations, but that does not mean that an application will receive I/O completion for these I/O operations by the time the closesocket function returns. Thus, an application should not cleanup any resources (WSAOVERLAPPED structures, for example) referenced by the outstanding I/O requests until the I/O requests are indeed completed.





在IOCP模式下,若是调用closesocket时有未决的pending   IO,将致使socket被重置,因此有时会出现数据丢失。正统的解决方式是使用shutdown函数(指定SD_SEND标志),注意这时可能有未完成的发送pengding   IO,因此你应该监测是否该链接的全部是否已完成(也许你要用一个计数器来跟踪这些pending   IO),仅在全部send   pending   IO完成后调用shutdown。





MSDN推荐的优雅关闭socket:




Call WSAAsyncSelect to register for FD_CLOSE notification.
Call shutdown with how=SD_SEND.
When FD_CLOSE received, call recv until zero returned, or SOCKET_ERROR.
Call closesocket.



FD_CLOSE being posted after all data is read from a socket. An application should check for remaining data upon receipt of FD_CLOSE to avoid any possibility of losing data.


























对每一个使用AcceptEx接受的链接套结字使用setsockopt设置SO_UPDATE_ACCEPT_CONTEXT选项,这个选项原义是把listen套结字一些属性(包括socket内部接受/发送缓存大小等等)拷贝到新创建的套结字,却能够使后续的shutdown调用成功。



/* SO_UPDATE_ACCEPT_CONTEXT is required for shutdown() to work fine*/
       setsockopt( sockClient,
                            SOL_SOCKET,
                            SO_UPDATE_ACCEPT_CONTEXT,
                            (char*)&m_sockListen,
                            sizeof(m_sockListen) ) ;
若是是调用AcceptEX接收的链接 不设置该选项的话,随后的shutdown调用
将返回失败, WSAGetLastError() returns 10057 -- WSANOTCONN 




2012.10.24


用智能指针重构了网络库,替换了裸指针。


可是发现IOCP以下一个问题:




1. 收到14字节数据
2012-10-25[02_02_05_906[DBG]:OnRecv : Worker thread [6268], socket = 11256, bytes = 14


2.再次投递RECV请求,发生错误,由于对端已经关闭
2012-10-25[02_02_05_906[DBG]:Fatal error when post recv, error 10054, socket = 11256
2012-10-25[02_02_05_906[DBG]:Socket is set invalid 11256


3.因而准备回收资源,结束RECV请求;
2012-10-25[02_02_05_906[DBG]:EndRecv : Worker thread [6268], socket = 11256


4.但此时overlap结构仍然是挂起状态?
2012-10-25[02_02_05_906[DBG]:2EndRecv socket 11256, now recv overlappe is complete ? 0
相关文章
相关标签/搜索