这个小节的介绍,在《架构设计:系统间通讯(1)——概述从“聊天”开始上篇》 这篇文章中已经说明了,这里只是“接着讲”,您能够理解成“在概述的基础上继续深刻写”。BIO就是:blocking IO。最容易理解、最容易实现的IO工做方式,应用程序向操做系统请求网络IO操做,这时应用程序会一直等待;另外一方面,操做系统收到请求后,也会等待, 直到网络上有数据传到监听端口;操做系统在收集数据后,会把数据发送给应用程序;最后应用程序受到数据,并解除等待状态。以下图所示: java
(请您注意,上图中交互的两个元素是应用程序和它所使用的操做系统)就TCP协议来讲,整个过程实际上分红三个步骤:三次握手创建链接、传输数据(包括验证和重发)、断开链接。固然,断开链接的过程并不在咱们讨论的IO的主要过程当中。但是咱们讨论IO模型,应该把创建链接和传输数据的者两个过程分开讨论。 linux
JAVA对阻塞模式的支持,就是java.net包中的Socket套接字实现。这里要说明一下,Socket套接字是TCP/UDP等传输层协议的实现。 例如客户端使用TCP协议链接这台服务器的时候,当TCP三次握手成功后,应用程序就会建立一个socket套接字对象(注意,这是尚未进行数据内容的 传输),当这个TCP链接出现数据传输时,socket套接字就会把数据传输的表现告诉程序员(例如read方法接触阻塞状态) 程序员
下面这段代码是java对阻塞模式的支持: apache
package testBSocket; import java.io.InputStream; import java.io.OutputStream; import java.net.ServerSocket; import java.net.Socket; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.log4j.BasicConfigurator; public class SocketServer1 { static { BasicConfigurator.configure(); } /** * 日志 */ private static final Log LOGGER = LogFactory.getLog(SocketServer1.class); public static void main(String[] args) throws Exception{ ServerSocket serverSocket = new ServerSocket(83); try { while(true) { //这里JAVA经过JNI请求操做系统,并一直等待操做系统返回结果(或者出错) Socket socket = serverSocket.accept(); //下面咱们收取信息(这里仍是阻塞式的,一直等待,直到有数据能够接受) InputStream in = socket.getInputStream(); OutputStream out = socket.getOutputStream(); Integer sourcePort = socket.getPort(); int maxLen = 2048; byte[] contextBytes = new byte[maxLen]; int realLen; StringBuffer message = new StringBuffer(); //read的时候,程序也会被阻塞,直到操做系统把网络传来的数据准备好。 while((realLen = in.read(contextBytes, 0, maxLen)) != -1) { message.append(new String(contextBytes , 0 , realLen)); /* * 咱们假设读取到“over”关键字, * 表示客户端的全部信息在通过若干次传送后,完成 * */ if(message.indexOf("over") != -1) { break; } } //下面打印信息 SocketServer1.LOGGER.info("服务器收到来自于端口:" + sourcePort + "的信息:" + message); //下面开始发送信息 out.write("回发响应信息!".getBytes()); //关闭 out.close(); in.close(); socket.close(); } } catch(Exception e) { SocketServer1.LOGGER.error(e.getMessage(), e); } finally { if(serverSocket != null) { serverSocket.close(); } } } }
上面的服务器端代码能够直接运行。代码执行到serverSocket.accept()的位置就会等待,这个调用的含义是应用程序向操做系统请求 客户端链接的接收,这是代码会阻塞,而底层调用的位置在DualStackPlainSocketImpl这个类里面(注意我使用的测试环境是 windows 8 ,因此是由这个类处理;若是您是在windows 7环境下进行测试,那么处理类是TwoStacksPlainSocketImpl,这是Windows环境;若是您使用的测试环境是Linux,那么视 Linux的内核版本而异,具体的处理类又是不同的)。 windows
很明显,咱们在代码里面并无设置timeout属性,因此运行的是“if”这段的代码,很明显在调用JNI后,下层也在等待有客户端链接上来。这种调用方式固然有问题: 服务器
同一时间,服务器只能接受来自于客户端A的请求信息;虽然客户端A和客户端B的请求是同时进行的,但客户端B发送的请求信息只能等到服务器接受完A的请求数据后,才能被接受。 网络
因为服务器一次只能处理一个客户端请求,当处理完成并返回后(或者异常时),才能进行第二次请求的处理。很显然,这样的处理方式在高并发的状况下,是不能采用的。 多线程
实际上以上的问题是能够经过多线程来解决的,实际上就是当accept接收到一个客户端的链接后,服务器端启动一个新的线程,来读写客户端的数据,并完成相应的业务处理。可是你没法影响操做系统底层的“同步IO”机制。 架构
必定要注意:阻塞/非阻塞的描述是针对应用程序中的线程进行的,对于阻塞方式的一种改进是应用程序将其“一直等待”的状态主动打开,以下图所示: 并发
这种模式下,应用程序的线程再也不一直等待操做系统的IO状态,而是在等待一段时间后,就解除阻塞。若是没有获得想要的结果,则再次进行相同的操做。这样的工做方式,暴增了应用程序的线程能够不会一直阻塞,而是能够进行一些其余工做。
那么JAVA中是否支持这种非阻塞IO的工做模式呢?咱们继续分析DualStackPlainSocketImpl中的accept0实现:
那么timeout是在哪里设置的呢?在ServerSocket中,调用了DualStackPlainSocketImpl的父类SocketImpl进行timeout的设置:
ServerSocket中的setSoTimeout方法也有相应的注释说明:
Enable/disable SO_TIMEOUT with the specified timeout, in milliseconds. With this option set to a non-zero timeout, a call to accept() for this ServerSocket will block for only this amount of time. If the timeout expires, a java.net.SocketTimeoutException is raised, though the ServerSocket is still valid. The option must be enabled prior to entering the blocking operation to have effect. The timeout must be > 0. A timeout of zero is interpreted as an infinite timeout.
那么java中对非阻塞IO的支持以下:
package testBSocket; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.net.ServerSocket; import java.net.Socket; import java.net.SocketTimeoutException; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.log4j.BasicConfigurator; public class SocketServer2 { static { BasicConfigurator.configure(); } private static Object xWait = new Object(); /** * 日志 */ private static final Log LOGGER = LogFactory.getLog(SocketServer2.class); public static void main(String[] args) throws IOException { ServerSocket serverSocket = null; try { serverSocket = new ServerSocket(83); serverSocket.setSoTimeout(100); while(true) { Socket socket = null; try { socket = serverSocket.accept(); } catch(SocketTimeoutException e1) { //=========================================================== // 执行到这里,说明本次accept没有接收到任何数据报文 // 主线程在这里就能够作一些事情,记为X //=========================================================== synchronized (SocketServer2.xWait) { SocketServer2.LOGGER.info("此次没有从底层接收到任务数据报文,等待10毫秒,模拟事件X的处理时间"); SocketServer2.xWait.wait(10); } continue; } InputStream in = socket.getInputStream(); OutputStream out = socket.getOutputStream(); Integer sourcePort = socket.getPort(); int maxLen = 2048; byte[] contextBytes = new byte[maxLen]; int realLen; StringBuffer message = new StringBuffer(); //下面咱们收取信息(这里仍是阻塞式的,一直等待,直到有数据能够接受) while((realLen = in.read(contextBytes, 0, maxLen)) != -1) { message.append(new String(contextBytes , 0 , realLen)); /* * 咱们假设读取到“over”关键字, * 表示客户端的全部信息在通过若干次传送后,完成 * */ if(message.indexOf("over") != -1) { break; } } //下面打印信息 SocketServer2.LOGGER.info("服务器收到来自于端口:" + sourcePort + "的信息:" + message); //下面开始发送信息 out.write("回发响应信息!".getBytes()); //关闭 out.close(); in.close(); socket.close(); } } catch(Exception e) { SocketServer2.LOGGER.error(e.getMessage(), e); } finally { if(serverSocket != null) { serverSocket.close(); } } } }
执行效果以下:
这里咱们针对了SocketServer增长了阻塞等待时间,实际上只实现了非阻塞IO模型中的第一步:监听链接状态的非阻塞。经过运行代码,咱们能够发现read()方法仍是被阻塞的,说明socket套接字等待数据读取的过程,仍是阻塞方式。
那么,咱们能不能改进read()方式,让它也变成非阻塞模式呢?固然是能够的,socket套接字一样支持等待超时时间设置。代码以下:
package testBSocket; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.net.ServerSocket; import java.net.Socket; import java.net.SocketTimeoutException; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.log4j.BasicConfigurator; public class SocketServer3 { static { BasicConfigurator.configure(); } private static Object xWait = new Object(); /** * 日志 */ private static final Log LOGGER = LogFactory.getLog(SocketServer3.class); public static void main(String[] args) throws IOException { ServerSocket serverSocket = null; try { serverSocket = new ServerSocket(83); serverSocket.setSoTimeout(100); while(true) { Socket socket = null; try { socket = serverSocket.accept(); } catch(SocketTimeoutException e1) { //=========================================================== // 执行到这里,说明本次accept没有接收到任何TCP链接 // 主线程在这里就能够作一些事情,记为X //=========================================================== synchronized (SocketServer3.xWait) { SocketServer3.LOGGER.info("此次没有从底层接收到任何TCP链接,等待10毫秒,模拟事件X的处理时间"); SocketServer3.xWait.wait(10); } continue; } InputStream in = socket.getInputStream(); OutputStream out = socket.getOutputStream(); Integer sourcePort = socket.getPort(); int maxLen = 2048; byte[] contextBytes = new byte[maxLen]; int realLen; StringBuffer message = new StringBuffer(); //下面咱们收取信息(设置成非阻塞方式,这样read信息的时候,又能够作一些其余事情) socket.setSoTimeout(10); BIORead:while(true) { try { while((realLen = in.read(contextBytes, 0, maxLen)) != -1) { message.append(new String(contextBytes , 0 , realLen)); /* * 咱们假设读取到“over”关键字, * 表示客户端的全部信息在通过若干次传送后,完成 * */ if(message.indexOf("over") != -1) { break BIORead; } } } catch(SocketTimeoutException e2) { //=========================================================== // 执行到这里,说明本次read没有接收到任何数据流 // 主线程在这里又能够作一些事情,记为Y //=========================================================== SocketServer3.LOGGER.info("此次没有从底层接收到任务数据报文,等待10毫秒,模拟事件Y的处理时间"); continue; } } //下面打印信息 SocketServer3.LOGGER.info("服务器收到来自于端口:" + sourcePort + "的信息:" + message); //下面开始发送信息 out.write("回发响应信息!".getBytes()); //关闭 out.close(); in.close(); socket.close(); } } catch(Exception e) { SocketServer3.LOGGER.error(e.getMessage(), e); } finally { if(serverSocket != null) { serverSocket.close(); } } } }
这样一来,咱们利用JAVA实现了完整的“非阻塞IO”模型:让TCP链接和数据读取这两个过程,都变成了“非阻塞”方式了。
然并卵,这种处理方式实际上并无解决accept方法、read方法阻塞的根本问题。根据上文的叙述,accept方法、read方法阻塞的根本 问题是底层接受数据报文时的“同步IO”工做方式。这两次改进过程,只是解决了IO操做的两步中的第一步:将程序层面的阻塞方式变成了非阻塞方式。
另外一个方面,因为应用程序级别,咱们并无使用多线程技术,这就致使了应用程序只能一个socket套接字 一个socket套接字的处理。这个socket套接字没有处理完,就无法处理下一个socket套接字。针对这个问题咱们仍是能够进行改进的:让应用程 序层面上,各个socket套接字的处理不相互影响:
package testBSocket; import java.io.InputStream; import java.io.OutputStream; import java.net.ServerSocket; import java.net.Socket; import java.net.SocketTimeoutException; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.log4j.BasicConfigurator; /** * 经过加入线程的概念,让socket server可以在应用层面, * 经过非阻塞的方式同时处理多个socket套接字 * @author yinwenjie */ public class SocketServer4 { static { BasicConfigurator.configure(); } private static Object xWait = new Object(); private static final Log LOGGER = LogFactory.getLog(SocketServer4.class); public static void main(String[] args) throws Exception{ ServerSocket serverSocket = new ServerSocket(83); serverSocket.setSoTimeout(100); try { while(true) { Socket socket = null; try { socket = serverSocket.accept(); } catch(SocketTimeoutException e1) { //=========================================================== // 执行到这里,说明本次accept没有接收到任何TCP链接 // 主线程在这里就能够作一些事情,记为X //=========================================================== synchronized (SocketServer4.xWait) { SocketServer4.LOGGER.info("此次没有从底层接收到任何TCP链接,等待10毫秒,模拟事件X的处理时间"); SocketServer4.xWait.wait(10); } continue; } //固然业务处理过程能够交给一个线程(这里可使用线程池),而且线程的建立是很耗资源的。 //最终改变不了.accept()只能一个一个接受socket链接的状况 SocketServerThread socketServerThread = new SocketServerThread(socket); new Thread(socketServerThread).start(); } } catch(Exception e) { SocketServer4.LOGGER.error(e.getMessage(), e); } finally { if(serverSocket != null) { serverSocket.close(); } } } } /** * 固然,接收到客户端的socket后,业务的处理过程能够交给一个线程来作。 * 但仍是改变不了socket被一个一个的作accept()的状况。 * @author yinwenjie */ class SocketServerThread implements Runnable { /** * 日志 */ private static final Log LOGGER = LogFactory.getLog(SocketServerThread.class); private Socket socket; public SocketServerThread (Socket socket) { this.socket = socket; } @Override public void run() { InputStream in = null; OutputStream out = null; try { in = socket.getInputStream(); out = socket.getOutputStream(); Integer sourcePort = socket.getPort(); int maxLen = 2048; byte[] contextBytes = new byte[maxLen]; int realLen; StringBuffer message = new StringBuffer(); //下面咱们收取信息(设置成非阻塞方式,这样read信息的时候,又能够作一些其余事情) this.socket.setSoTimeout(10); BIORead:while(true) { try { while((realLen = in.read(contextBytes, 0, maxLen)) != -1) { message.append(new String(contextBytes , 0 , realLen)); /* * 咱们假设读取到“over”关键字, * 表示客户端的全部信息在通过若干次传送后,完成 * */ if(message.indexOf("over") != -1) { break BIORead; } } } catch(SocketTimeoutException e2) { //=========================================================== // 执行到这里,说明本次read没有接收到任何数据流 // 主线程在这里又能够作一些事情,记为Y //=========================================================== SocketServerThread.LOGGER.info("此次没有从底层接收到任务数据报文,等待10毫秒,模拟事件Y的处理时间"); continue; } } //下面打印信息 Long threadId = Thread.currentThread().getId(); SocketServerThread.LOGGER.info("服务器(线程:" + threadId + ")收到来自于端口:" + sourcePort + "的信息:" + message); //下面开始发送信息 out.write("回发响应信息!".getBytes()); //关闭 out.close(); in.close(); this.socket.close(); } catch(Exception e) { SocketServerThread.LOGGER.error(e.getMessage(), e); } } }
引入了多线程技术后,IO的处理吞吐量大大提升了,可是这样作就真的没有问题了吗,您要知道操做系统但是有“最大线程”限制的:
虽然在服务器端,请求的处理交给了一个独立线程进行,可是操做系统通知accept()的方式仍是单个处理的(甚至都不是非阻塞模式)。也 就是说,其实是服务器接收到数据报文后的“业务处理过程”能够多线程(包括能够是非阻塞模式),可是数据报文的接受仍是须要一个一个的来。
在linux系统中,能够建立的线程是有限的。咱们能够经过cat /proc/sys/kernel/threads-max 命令查看能够建立的最大线程数。固然这个值是能够更改的,可是线程越多,CPU切换所需的时间也就越长,用来处理真正业务的需求也就越少。
建立一个线程是有较大的资源消耗的。JVM建立一个线程的时候,即便这个线程不作任何的工做,JVM都会分配一个堆栈空间。这个空间的大小默认为128K,您能够经过-Xss参数进行调整。
固然您还可使用ThreadPoolExecutor线程池来缓解线程的建立问题,可是又会形成BlockingQueue积压任务的持续增长,一样消耗了大量资源。另外,若是您的应用程序大量使用长链接的话,线程是不会关闭的。这样系统资源的消耗更容易失控。
最后,不管您是使用的多线程、仍是加入了非阻塞模式,这都是在应用程序层面的处理,而底层socketServer所匹配的操做系统的IO模型始终是“同步IO”,最根本的问题并无解决。
那么,若是你真想单纯使用线程来解决问题,那么您本身均可以计算出来您一个服务器节点能够一次接受多大的并发了。看来,单纯使用线程解决这个问题不是最好的办法。
我将详细讲解操做系统支持的多路复用IO的工做方式,并介绍JAVA 1.4版本中加入的 JAVA NIO对多路复用IO的实现。(东西太多,咱们放下下篇中)
我将详细讲解操做系统支持的异步IO方式,并介绍JAVA 1.7版本中加入的NIO2.0(AIO)对异步IO的实现。(东西太多,咱们放下下篇中)