在Reactor和Proactor模型一文中讲到,Reactor模型提供了一个比较理想的I/O编程框架,让程序更有结构,用户使用起来更加方便,比裸API调用开发效率要高。另一方面,若是但愿每一个事件通知以后,作的事情能有机会被代理到某个线程里面去单独运行,而线程完成的状态又能通知回主任务,那么“异步”的机制就必须被引入。本文以boost.Asio库(其设计模式为Proactor)为基础,讲解为何须要异步编程以及异步编程的实现。html
设想你是一位体育老师,须要测验100位同窗的400米成绩。你固然不会让100位同窗一块儿起跑,由于当同窗们返回终点时,你根原本不及掐表记录各位同窗的成绩。react
若是你每次让一位同窗起跑并等待他回到终点你记下成绩后再让下一位起跑,直到全部同窗都跑完。恭喜你,你已经掌握了同步阻塞模式。你设计了一个函数,传入参数是学生号和起跑时间,返回值是到达终点的时间。你调用该函数100次,就能完成此次测验任务。这个函数是同步的,由于只要你调用它,就能获得结果;这个函数也是阻塞的,由于你一旦调用它,就必须等待,直到它给你结果,不能去干其余事情。git
若是你一边每隔10秒让一位同窗起跑,直到全部同窗出发完毕;另外一边每有一个同窗回到终点就记录成绩,直到全部同窗都跑完。恭喜你,你已经掌握了异步非阻塞模式。你设计了两个函数,其中一个函数记录起跑时间和学生号,该函数你会主动调用100次;另外一个函数记录到达时间和学生号,该函数是一个事件驱动的callback函数,当有同窗到达终点时,你会被动调用。你主动调用的函数是异步的,由于你调用它,它并不会告诉你结果;这个函数也是非阻塞的,由于你一旦调用它,它就立刻返回,你不用等待就能够再次调用它。但仅仅将这个函数调用100次,你并无完成你的测验任务,你还须要被动等待调用另外一个函数100次。github
固然,你立刻就会意识到,同步阻塞模式的效率明显低于异步非阻塞模式。那么,谁还会使用同步阻塞模式呢?不错,异步模式效率高,但更麻烦,你一边要记录起跑同窗的数据,一边要记录到达同窗的数据,并且同窗们回到终点的次序与起跑的次序并不相同,因此你还要不停地在你的成绩册上查找学生号。忙乱之中你每每会张冠李戴。你可能会想出更聪明的办法:你带了不少块秒表,让同窗们分组互相测验。恭喜你!你已经掌握了多线程同步模式!编程
每一个拿秒表的同窗均可以独立调用你的同步函数,这样既不容易出错,效率也大大提升,只要秒表足够多,同步的效率也能达到甚至超过异步。设计模式
能够理解,你现的问题多是:既然多线程同步既快又好,异步模式还有存在的必要吗?api
很遗憾,异步模式依然很是重要,由于在不少状况下,你拿不出不少秒表。你须要通讯的对端系统可能只容许你创建一个SOCKET链接,不少金融、电信行业的大型业务系统都如此要求。安全
如下部分主要来自于:http://www.javashuo.com/article/p-xgjicfxa-bo.html
依据微软的MSDN上的解说:
(1)、同步函数:当一个函数是同步执行时,那么当该函数被调用时不会当即返回,直到该函数所要作的事情全都作完了才返回。
(2)、异步函数:若是一个异步函数被调用时,该函数会当即返回尽管该函数规定的操做任务尚未完成。
(3)、在一个线程中分别调用上述两种函数会对调用线程有何影响呢?网络
(4)、一个调用了异步函数的线程如何与异步函数的执行结果同步呢?多线程
咱们是否已经发现了一个有趣的地方呢?!就是咱们可使用等待函数将一个异步执行的函数封装成一个同步函数。
操做系统发展到今天已经十分精巧,线程就是其中一个杰做。操做系统把 CPU 处理时间划分红许多短暂时间片,在时间 T1 执行一个线程的指令,到时间 T2 又执行下一线程的指令,各线程轮流执行,结果好象是全部线程在并肩前进。这样,编程时能够建立多个线程,在同一期间执行,各线程能够“并行”完成不一样的任务。
在单线程方式下,计算机是一台严格意义上的冯·诺依曼式机器,一段代码调用另外一段代码时,只能采用同步调用,必须等待这段代码执行完返回结果后,调用方才能继续往下执行。有了多线程的支持,能够采用异步调用,调用方和被调方能够属于两个不一样的线程,调用方启动被调方线程后,不等对方返回结果就继续执行后续代码。被调方执行完毕后,经过某种手段通知调用方:结果已经出来,请酌情处理。
计算机中有些处理比较耗时。调用这种处理代码时,调用方若是站在那里苦苦等待,会严重影响程序性能。例如,某个程序启动后若是须要打开文件读出其中的数据,再根据这些数据进行一系列初始化处理,程序主窗口将迟迟不能显示,让用户感到这个程序怎么等半天也不出来,太差劲了。借助异步调用能够把问题轻松化解:把整个初始化处理放进一个单独线程,主线程启动此线程后接着往下走,让主窗口瞬间显示出来。等用户盯着窗口犯呆时,初始化处理就在背后悄悄完成了。程序开始稳定运行之后,还能够继续使用这种技巧改善人机交互的瞬时反应。用户点击鼠标时,所激发的操做若是较费时,再点击鼠标将不会当即反应,整个程序显得很沉重。借助异步调用处理费时的操做,让主线程随时恭候下一条消息,用户点击鼠标时感到轻松快捷,确定会对软件产生好感。
异步调用用来处理从外部输入的数据特别有效。假如计算机须要从一台低速设备索取数据,而后是一段冗长的数据处理过程,采用同步调用显然很不合算:计算机先向外部设备发出请求,而后等待数据输入;而外部设备向计算机发送数据后,也要等待计算机完成数据处理后再发出下一条数据请求。双方都有一段等待期,拉长了整个处理过程。其实,计算机能够在处理数据以前先发出下一条数据请求,而后当即去处理数据。若是数据处理比数据采集快,要等待的只有计算机,外部设备能够连续不停地采集数据。若是计算机同时链接多台输入设备,能够轮流向各台设备发出数据请求,并随时处理每台设备发来的数据,整个系统能够保持连续高速运转。编程的关键是把数据索取代码和数据处理代码分别归属两个不一样的线程。数据处理代码调用一个数据请求异步函数,而后径自处理手头的数据。待下一组数据到来后,数据处理线程将收到通知,结束 wait 状态,发出下一条数据请求,而后继续处理数据。
异步调用时,调用方不等被调方返回结果就转身离去,所以必须有一种机制让被调方有告终果时能通知调用方。在同一进程中有不少手段能够利用,笔者经常使用的手段是回调、event 对象和消息。
回调:回调方式很简单:调用异步函数时在参数中放入一个函数地址,异步函数保存此地址,待有告终果后回调此函数即可以向调用方发出通知。若是把异步函数包装进一个对象中,能够用事件取代回调函数地址,经过事件处理例程向调用方发通知。
event : event 是 Windows 系统提供的一个经常使用同步对象,以在异步处理中对齐不一样线程之间的步点。若是调用方暂时无事可作,能够调用 wait 函数等在那里,此时 event 处于 nonsignaled 状态。当被调方出来结果以后,把 event 对象置于 signaled 状态,wait 函数便自动结束等待,使调用方从新动做起来,从被调方取出处理结果。这种方式比回调方式要复杂一些,速度也相对较慢,但有很大的灵活性,能够搞出不少花样以适应比较复杂的处理系统。
消息:借助 Windows 消息发通知是个不错的选择,既简单又安全。程序中定义一个用户消息,并由调用方准备好消息处理例程。被调方出来结果以后当即向调用方发送此消息,并经过 WParam 和 LParam 这两个参数传送结果。消息老是与窗口 handle 关联,所以调用方必须借助一个窗口才能接收消息,这是其不方便之处。另外,经过消息联络会影响速度,须要高速处理时回调方式更有优点。
若是调用方和被调方分属两个不一样的进程,因为内存空间的隔阂,通常是采用 Windows 消息发通知比较简单可靠,被调方能够借助消息自己向调用方传送数据。event 对象也能够经过名称在不一样进程间共享,但只能发通知,自己没法传送数据,须要借助 Windows 消息和 FileMapping 等内存共享手段或借助 MailSlot 和 Pipe 等通讯手段。
若是你的服务端的客户端数量多,你的服务端就采用异步的,可是你的客户端能够用同步的,客户端通常功能比较单一,收到数据后才能执行下面的工做,因此弄成同步的在那等。
同步异步指的是通讯模式,而阻塞和非阻塞指的是在接收和发送时是否等待动做完成才返回。
首先是通讯的同步,主要是指客户端在发送请求后,必须得在服务端有回应后才发送下一个请求。因此这个时候的全部请求将会在服务端获得同步。
其次是通讯的异步,指客户端在发送请求后,没必要等待服务端的回应就能够发送下一个请求,这样对于全部的请求动做来讲将会在服务端获得异步,这条请求的链路就象是一个请求队列,全部的动做在这里不会获得同步的。
阻塞和非阻塞只是应用在请求的读取和发送。
在实现过程当中,若是服务端是异步的话,客户端也是异步的话,通讯效率会很高,但若是服务端在请求的返回时也是返回给请求的链路时,客户端是能够同步的,这种状况下,服务端是兼容同步和异步的。相反,若是客户端是异步而服务端是同步的也不会有问题,只是处理效率低了些。
阻塞 block 是指,你拨通某人的电话,可是此人不在,因而你拿着电话等他回来,其间不能再用电话。同步大概和阻塞差很少。
非阻塞 nonblock 是指,你拨通某人的电话,可是此人不在,因而你挂断电话,待会儿再打。至于到时候他回来没有,只有打了电话才知道。即所谓的“轮询 / poll”。
异步是指,你拨通某人的电话,可是此人不在,因而你叫接电话的人告诉那人(leave a message),回来后给你打电话(call back)。
1、同步阻塞模式
在这个模式中,用户空间的应用程序执行一个系统调用,并阻塞,直到系统调用完成为止(数据传输完成或发生错误)。
2、同步非阻塞模式
同步阻塞 I/O 的一种效率稍低的。非阻塞的实现是 I/O 命令可能并不会当即知足,须要应用程序调用许屡次来等待操做完成。这可能效率不高,由于在不少状况下,当内核执行这个命令时,应用程序必需要进行忙碌等待,直到数据可用为止,或者试图执行其余工做。由于数据在内核中变为可用到用户调用 read 返回数据之间存在必定的间隔,这会致使总体数据吞吐量的下降。但异步非阻塞因为是多线程,效率仍是高。
/* create the connection by socket * means that connect "sockfd" to "server_addr" * 同步阻塞模式 */ if (connect(sockfd, (struct sockaddr *)&server_addr, sizeof(struct sockaddr)) == -1) { perror("connect"); exit(1); } /* 同步非阻塞模式 */ while (send(sockfd, snd_buf, sizeof(snd_buf), MSG_DONTWAIT) == -1) { sleep(1); printf("sleep\n"); }
前面说了那么多,如今终于能够回到咱们的正题,介绍异步编程了。就像以前所说的,同步编程比异步编程简单不少。这是由于,线性的思考是很简单的(调用A,调用A结束,调用B,调用B结束,而后继续,这是以事件处理的方式来思考)。后面你会碰到这种状况,好比:五件事情,你不知道它们执行的顺序,也不知道他们是否会执行!这部分主要参考:https://mmoaay.gitbooks.io/boost-asio-cpp-network-programming-chinese/content/Chapter2.html
尽管异步编程更难,可是你会更倾向于选择使用它,好比:写一个须要处理不少并发访问的服务端。并发访问越多,异步编程就比同步编程越简单。
假设:你有一个须要处理1000个并发访问的应用,从客户端发给服务端的每一个信息都会再返回给客户端,以‘\n’结尾。
同步方式的代码,1个线程:
using namespace boost::asio; struct client { ip::tcp::socket sock; char buff[1024]; // 每一个信息最多这么大 int already_read; // 你已经读了多少 }; std::vector<client> clients; void handle_clients() { while ( true) for ( int i = 0; i < clients.size(); ++i) if ( clients[i].sock.available() ) on_read(clients[i]); } void on_read(client & c) { int to_read = std::min( 1024 - c.already_read, c.sock.available()); c.sock.read_some( buffer(c.buff + c.already_read, to_read)); c.already_read += to_read; if ( std::find(c.buff, c.buff + c.already_read, '\n') < c.buff + c.already_read) { int pos = std::find(c.buff, c.buff + c.already_read, '\n') - c.buff; std::string msg(c.buff, c.buff + pos); std::copy(c.buff + pos, c.buff + 1024, c.buff); c.already_read -= pos; on_read_msg(c, msg); } } void on_read_msg(client & c, const std::string & msg) { // 分析消息,而后返回 if ( msg == "request_login") c.sock.write( "request_ok\n"); else if ... }
有一种状况是在任何服务端(和任何基于网络的应用)都须要避免的,就是代码无响应的状况。在咱们的例子里,咱们须要handle_clients()方法尽量少的阻塞。若是方法在某个点上阻塞,任何进来的信息都须要等待方法解除阻塞才能被处理。
为了保持响应,只在一个套接字有数据的时候咱们才读,也就是说,if ( clients[i].sock.available() ) on_read(clients[i])。在on_read时,咱们只读当前可用的;调用read_until(c.sock, buffer(...), '\n')会是一个很是糟糕的选择,由于直到咱们从一个指定的客户端读取了完整的消息以前,它都是阻塞的(咱们永远不知道它何时会读取到完整的消息)
这里的瓶颈就是on_read_msg()方法;当它执行时,全部进来的消息都在等待。一个良好的on_read_msg()方法实现会保证这种状况基本不会发生,可是它仍是会发生(有时候向一个套接字写入数据,缓冲区满了时,它会被阻塞)
同步方式的代码,10个线程
using namespace boost::asio; struct client { // ... 和以前同样 bool set_reading() { boost::mutex::scoped_lock lk(cs_); if ( is_reading_) return false; // 已经在读取 else { is_reading_ = true; return true; } } void unset_reading() { boost::mutex::scoped_lock lk(cs_); is_reading_ = false; } private: boost::mutex cs_; bool is_reading_; }; std::vector<client> clients; void handle_clients() { for ( int i = 0; i < 10; ++i) boost::thread( handle_clients_thread); } void handle_clients_thread() { while ( true) for ( int i = 0; i < clients.size(); ++i) if ( clients[i].sock.available() ) if ( clients[i].set_reading()) { on_read(clients[i]); clients[i].unset_reading(); } } void on_read(client & c) { // 和以前同样 } void on_read_msg(client & c, const std::string & msg) { // 和以前同样 }
为了使用多线程,咱们须要对线程进行同步,这就是set_reading()和set_unreading()所作的。set_reading()方法很是重要,好比你想要一步实现“判断是否在读取而后标记为读取中”。但这是有两步的(“判断是否在读取”和“标记为读取中”),你可能会有两个线程同时为一个客户端判断是否在读取,而后你会有两个线程同时为一个客户端调用on_read,结果就是数据冲突甚至致使应用崩溃。
你会发现代码变得极其复杂。
同步编程有第三个选择,就是为每一个链接开辟一个线程。可是当并发的线程增长时,这就成了一种灾难性的状况。
而后,让咱们来看异步编程。咱们不断地异步读取。当一个客户端请求某些东西时,on_read被调用,而后回应,而后等待下一个请求(而后开始另一个异步的read操做)。
异步方式的代码,10个线程
using namespace boost::asio; io_service service; struct client { ip::tcp::socket sock; streambuf buff; // 从客户端取回结果 } std::vector<client> clients; void handle_clients() { for ( int i = 0; i < clients.size(); ++i) async_read_until(clients[i].sock, clients[i].buff, '\n', boost::bind(on_read, clients[i], _1, _2)); for ( int i = 0; i < 10; ++i) boost::thread(handle_clients_thread); } void handle_clients_thread() { service.run(); } void on_read(client & c, const error_code & err, size_t read_bytes) { std::istream in(&c.buff); std::string msg; std::getline(in, msg); if ( msg == "request_login") c.sock.async_write( "request_ok\n", on_write); else if ... ... // 等待同一个客户端下一个读取操做 async_read_until(c.sock, c.buff, '\n', boost::bind(on_read, c, _1, _2)); }
发现代码变得有多简单了吧?client结构里面只有两个成员,handle_clients()仅仅调用了async_read_until,而后它建立了10个线程,每一个线程都调用service.run()。这些线程会处理全部来自客户端的异步read操做,而后分发全部向客户端的异步write操做。另外须要注意的一件事情是:on_read()一直在为下一次异步read操做作准备(看最后一行代码)。
再一次说明,若是有等待执行的操做,run()会一直执行,直到你手动调用io_service::stop()。为了保证io_service一直执行,一般你添加一个或者多个异步操做,而后在它们被执行时,你继续一直不停地添加异步操做,好比下面代码:
using namespace boost::asio; io_service service; ip::tcp::socket sock(service); char buff_read[1024], buff_write[1024] = "ok"; void on_read(const boost::system::error_code &err, std::size_t bytes); void on_write(const boost::system::error_code &err, std::size_t bytes) { sock.async_read_some(buffer(buff_read), on_read); } void on_read(const boost::system::error_code &err, std::size_t bytes) { // ... 处理读取操做 ... sock.async_write_some(buffer(buff_write,3), on_write); } void on_connect(const boost::system::error_code &err) { sock.async_read_some(buffer(buff_read), on_read); } int main(int argc, char* argv[]) { ip::tcp::endpoint ep( ip::address::from_string("127.0.0.1"), 2001); sock.async_connect(ep, on_connect); service.run(); }
假设你须要作下面的操做:
io_service service; ip::tcp::socket sock(service); char buff[512]; ... read(sock, buffer(buff));
在这个例子中,sock和buff的存在时间都必须比read()调用的时间要长。也就是说,在调用read()返回以前,它们都必须有效。这就是你所指望的;你传给一个方法的全部参数在方法内部都必须有效。当咱们采用异步方式时,事情会变得比较复杂。
io_service service; ip::tcp::socket sock(service); char buff[512]; void on_read(const boost::system::error_code &, size_t) {} ... async_read(sock, buffer(buff), on_read);
在这个例子中,sock和buff的存在时间都必须比read()操做自己时间要长,可是read操做持续的时间咱们是不知道的,由于它是异步的。
当使用socket缓冲区的时候,你会有一个buffer实例在异步调用时一直存在(使用boost::shared_array<>)。在这里,咱们可使用一样的方式,经过建立一个类并在其内部管理socket和它的读写缓冲区。而后,对于全部的异步操做,传递一个包含智能指针的boost::bind仿函数给它:
using namespace boost::asio; io_service service; struct connection : boost::enable_shared_from_this<connection> { typedef boost::system::error_code error_code; typedef boost::shared_ptr<connection> ptr; connection() : sock_(service), started_(true) {} void start(ip::tcp::endpoint ep) { sock_.async_connect(ep, boost::bind(&connection::on_connect, shared_from_this(), _1)); } void stop() { if ( !started_) return; started_ = false; sock_.close(); } bool started() { return started_; } private: void on_connect(const error_code & err) { // 这里你决定用这个链接作什么: 读取或者写入 if ( !err) do_read(); else stop(); } void on_read(const error_code & err, size_t bytes) { if ( !started() ) return; std::string msg(read_buffer_, bytes); if ( msg == "can_login") do_write("access_data"); else if ( msg.find("data ") == 0) process_data(msg); else if ( msg == "login_fail") stop(); } void on_write(const error_code & err, size_t bytes) { do_read(); } void do_read() { sock_.async_read_some(buffer(read_buffer_), boost::bind(&connection::on_read, shared_from_this(), _1, _2)); } void do_write(const std::string & msg) { if ( !started() ) return; // 注意: 由于在作另一个async_read操做以前你想要发送多个消息, // 因此你须要多个写入buffer std::copy(msg.begin(), msg.end(), write_buffer_); sock_.async_write_some(buffer(write_buffer_, msg.size()), boost::bind(&connection::on_write, shared_from_this(), _1, _2)); } void process_data(const std::string & msg) { // 处理服务端来的内容,而后启动另一个写入操做 } private: ip::tcp::socket sock_; enum { max_msg = 1024 }; char read_buffer_[max_msg]; char write_buffer_[max_msg]; bool started_; }; int main(int argc, char* argv[]) { ip::tcp::endpoint ep( ip::address::from_string("127.0.0.1"), 8001); connection::ptr(new connection)->start(ep); }
在全部异步调用中,咱们传递一个boost::bind仿函数看成参数。这个仿函数内部包含了一个智能指针,指向connection实例。只要有一个异步操做等待时,Boost.Asio就会保存boost::bind仿函数的拷贝,这个拷贝保存了指向链接实例的一个智能指针,从而保证connection实例保持活动。问题解决!
固然,connection类仅仅是一个框架类;你须要根据你的需求对它进行调整(它看起来会和当前服务端例子的状况至关不一样)。
你须要注意的是建立一个新的链接是至关简单的:connection::ptr(new connection)- >start(ep)。这个方法启动了到服务端的(异步)链接。当你须要关闭这个链接时,调用stop()。
当实例被启动时(start()),它会等待客户端的链接。当链接发生时。on_connect()被调用。若是没有错误发生,它启动一个read操做(do_read())。当read操做结束时,你就能够解析这个消息;固然你应用的on_read()看起来会各类各样。而当你写回一个消息时,你须要把它拷贝到缓冲区,而后像我在do_write()方法中所作的同样将其发送出去,由于这个缓冲区一样须要在这个异步写操做中一直存活。最后须要注意的一点——当写回时,你须要指定写入的数量,不然,整个缓冲区都会被发送出去。
网络api实际上要繁杂得多,这个章节只是作为一个参考,当你在实现本身的网络应用时能够回过头来看看。
Boost.Asio实现了端点的概念,你能够认为是IP和端口。若是你不知道准确的IP,你可使用resolver对象将主机名,例如www.yahoo.com转换为一个或多个IP地址。
咱们也能够看到API的核心——socket类。Boost.Asio提供了TCP、UDP和 ICMP的实现。并且你还能够用你本身的协议来对它进行扩展;固然,这个工做不适合缺少勇气的人。
异步编程是刚需。你应该已经明白为何有时候须要用到它,尤为在写服务端的时候。调用service.run()来实现异步循环就已经可让你很知足,可是有时候你须要更进一步,尝试使用run_one()、poll()或者poll_one()。
当实现异步时,你能够异步执行你本身的方法;使用service.post()或者service.dispatch()。
最后,为了使socket和缓冲区(read或者write)在整个异步操做的生命周期中一直活动,咱们须要采起特殊的防御措施。你的链接类须要继承自enabled_shared_from_this,而后在内部保存它须要的缓冲区,并且每次异步调用都要传递一个智能指针给this操做。