首先咱们来看一个显示场景:在现实生活中有两我的技术人员A和B,在进行一问一答形式的交流。以下图所示: java
咱们来看这幅图的中的几个要点: python
他们两都使用中文进行交流。若是他们一人使用的是南斯拉夫语另外一人使用的是索马里语,而且相互都不能理解对方的语系,很显然A所要表达的内容B是没法理解的。 linux
他们的声音是在空气中进行传播的。空气除了支撑他们的呼吸外,还支撑了他们声音的传播。若是没有空气他们是没法知道对方用中文说了什么。 c++
他们的交流方式是协调一致的,即A问完一个问题后,等待B进行回答。收到B的回答后,A才能问下一个问题。 程序员
因为都是人类,因此他们处理信息的方式也是同样的:用嘴说话,用耳朵听话,用大脑处理造成结果。 apache
目前这个交流场景下,只有A和B两我的。可是随时有可能增长N我的进来。第N我的可能不是采用中文进行交流。 设计模式
很明显经过中文的交谈,两我的相互明白了对方的意图。为了保证信息传递的高效性,咱们必定会将信息作成某种参与者都理解的格式。例如:中文有其特定的语法结构,例如主谓宾,定状补。 服务器
在计算机领域为了保证信息可以被处理,信息也会被作成特定的格式,并且要确保目标可以明白这种格式。经常使用的信息格式包括: 微信
XML:
可扩展标记语言,这个语言由W3C(万维网联盟)进行发布和维护。XML语言应用之普遍,扩展之丰富。适合作网络通讯的信息描述格式(通常是“应用层”协 议了)。例如Google 定义的XMPP通讯协议就是使用XML进行描述的;不过XML的更普遍使用场景是对系统环境进行描述(由于它会形成较多的没必要要的内容传输),例如服务器 的配置描述、Spring的配置描述、Maven仓库描述等等。 网络
JSON:
JSON(JavaScript Object Notation) 是一种轻量级的数据交换格式。它和XML的设计思路是一致的:和语言无关(流行的语言都支持JSON格式描述:Go、Python、C、C++、C#、 JAVA、Erlang、JavaScript等等);可是和XML不一样,JSON的设计目标就是为了进行通讯。要描述一样的数据,JSON格式的容量会 更小。
protocol buffer(PB):
protocolbuffer(如下简称PB)是google 的一种数据交换的格式,它独立于语言,独立于平台。google 提供了三种语言的实现:java、c++ 和 python,每一种实现都包含了相应语言的编译器以及库文件。
TLV(三元组编码):
T(标记/类型域)L(长度/大小域)V(值/内容域),一般这种信息格式用于金融、军事领域。它经过字节的位运算来进行信息的序列化/反序列化(听说微信的信息格式也采用的是TLV,但实际状况我不清楚):
这里有一篇介绍TLV的文章:《通讯协议之序列化TLV》,TLV格式所携带的内容是最有效的,它就连JSON中用于分割层次的“{}”符号都没有。
在这个系列的博文中,咱们不会把信息格式做为一个重点,可是会花一些篇幅去比较各类信息格式在网络上传输的速度、性能,并为你们介绍几种典型的信息格式选型场景。
如文中第一张图描述的场景,有一个咱们看不到可是却很重要的元素:空气。声音在空气中完成传播,真空没法传播声音。一样信息是在网络中完成传播的,没有网络就无法传播信息。网络协议就是计算机领域的“空气”,下图中咱们以OSI模型做为参考:
物理层:物理层就是咱们的网络设备层,例如咱们的网卡、交换机等设备,在他们之间咱们通常传递的是电信号或者光信号。
数据链路层:数据链路又分为物理链路和逻辑链路。物理链路负责组合一组电信号,称之为“帧”;逻辑链路层经过一些规则和协议保证帧传输的正确性,而且可使来自于多个源/目标 的帧在同一个物理链路上进行传输,实现“链路复用”。
网络层:网络层使用最普遍的协议是IP协议(又分为IPV4协议和IPV6协议),IPX协议。这些协议解决的是源和目标的定位问题,以及从源如何到达目标的问题。
传输层:TCP、UDP是传输层最常使用的协议,传输层的最重要工做就是携带内容信息了,而且经过他们的协议规范提供某种通讯机制。举例来 说,TCP协议中的通讯机制是:首先进行三次通讯握手,而后再进行正式数据的传送,而且经过校验机制保证每一个数据报文的正确性,若是数据报文错误了,则重 新发送。
应用层:HTTP协议、FTP协议、TELNET协议这些都是应用层协议。应用层协议是最灵活的协议,甚至能够由程序员自行定义应用层协议。下图咱们表示了HTTP协议的工做方式:
在这个系列的博文中,咱们不会把网络协议做为一个重点。这是由于网络网络协议的知识是一个相对独立的的知识领域,十几篇文章都不必定讲得清楚。若是您对网络协议有兴趣,这里推荐两本书:《TCP/IP详解.卷1-协议》和《TCP/IP详解.卷2-实现》。
在文章最前面咱们看到其中一我的规定了一种沟通方式:“你必须把我说的话听完,而后给我反馈后。我才会问第二个问题”。这种沟通方式虽然沟通效率不高,可是颇有效:一个问题一个问题的处理。
可是若是参与沟通的人处理信息的能力比较强,那么他们还能够采用另外一种沟通方式:“我给我提的问题编了一个号,在问完第X个问题后,我不会等待你返回,就会问第X+1个问题,一样你在听完我第X个问题后,一边处理个人问题,一边听我第X+1个问题。”
实际上以上两种现实中的沟通方式,在计算机领域是能够找到对应的通讯方式的,这就是咱们这个系列的博文会着重讲的BIO(阻塞模式)通讯和NIO(非阻塞模式)。
之前大多数网络通讯方式都是阻塞模式的,即:
客户端向服务器端发出请求后,客户端会一直等待(不会再作其余事情),直到服务器端返回结果或者网络出现问题。
服务器端一样的,当在处理某个客户端A发来的请求时,另外一个客户端B发来的请求会等待,直到服务器端的这个处理线程完成上一个处理。
以下图所示:
传统的BIO通讯方式存在几个问题:
同一时间,服务器只能接受来自于客户端A的请求信息;虽然客户端A和客户端B的请求是同时进行的,但客户端B发送的请求信息只能等到服务器接受完A的请求数据后,才能被接受。
因为服务器一次只能处理一个客户端请求,当处理完成并返回后(或者异常时),才能进行第二次请求的处理。很显然,这样的处理方式在高并发的状况下,是不能采用的。
上面说的状况是服务器只有一个线程的状况,那么读者会直接提出咱们可使用多线程技术来解决这个问题:
当服务器收到客户端X的请求后,(读取到全部请求数据后)将这个请求送入一个独立线程进行处理,而后主线程继续接受客户端Y的请求。
客户端一侧,也可使用一个子线程和服务器端进行通讯。这样客户端主线程的其余工做就不受影响了,当服务器端有响应信息的时候再由这个子线程经过 监听模式/观察模式(等其余设计模式)通知主线程。
以下图所示:
可是使用线程来解决这个问题其实是有局限性的:
虽然在服务器端,请求的处理交给了一个独立线程进行,可是操做系统通知accept()的方式仍是单个的。也就是,其实是服务器接收到数 据报文后的“业务处理过程”能够多线程,可是数据报文的接受仍是须要一个一个的来(下文的示例代码和debug过程咱们能够明确看到这一点)
在linux系统中,能够建立的线程是有限的。咱们能够经过cat /proc/sys/kernel/threads-max 命令查看能够建立的最大线程数。固然这个值是能够更改的,可是线程越多,CPU切换所需的时间也就越长,用来处理真正业务的需求也就越少。
建立一个线程是有较大的资源消耗的。JVM建立一个线程的时候,即便这个线程不作任何的工做,JVM都会分配一个堆栈空间。这个空间的大小默认为128K,您能够经过-Xss参数进行调整。
固然您还可使用ThreadPoolExecutor线程池来缓解线程的建立问题,可是又会形成BlockingQueue积压任务的持续增长,一样消耗了大量资源。
那么,若是你真想单纯使用线程解决阻塞的问题,那么您本身均可以算出来您一个服务器节点能够一次接受多大的并发了。看来,单纯使用线程解决这个问题不是最好的办法。
在这个系列的博文中,通讯方式/框架将做为一个重点进行讲解。包括NIO的原理,并经过讲解Netty的使用、JAVA原生NIO框架的使用,去熟悉这些核心原理。
实际上从上文中咱们能够看出,BIO的问题关键不在因而否使用了多线程(包括线程池)处理此次请求,而在于accept()、read()的操做点都是被阻塞。要测试这个问题,也很简单。咱们模拟了20个客户端(用20根线程模拟),利用JAVA的同步计数器CountDownLatch,保证这20个客户都初始化完成后而后同时向服务器发送请求,而后咱们来观察一下Server这边接受信息的状况。
package testBSocket; import java.util.concurrent.CountDownLatch; public class SocketClientDaemon { public static void main(String[] args) throws Exception { Integer clientNumber = 20; CountDownLatch countDownLatch = new CountDownLatch(clientNumber); //分别开始启动这20个客户端 for(int index = 0 ; index < clientNumber ; index++ , countDownLatch.countDown()) { SocketClientRequestThread client = new SocketClientRequestThread(countDownLatch, index); new Thread(client).start(); } //这个wait不涉及到具体的实验逻辑,只是为了保证守护线程在启动全部线程后,进入等待状态 synchronized (SocketClientDaemon.class) { SocketClientDaemon.class.wait(); } } }
package testBSocket; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.net.Socket; import java.util.concurrent.CountDownLatch; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.log4j.BasicConfigurator; /** * 一个SocketClientRequestThread线程模拟一个客户端请求。 * @author yinwenjie */ public class SocketClientRequestThread implements Runnable { static { BasicConfigurator.configure(); } /** * 日志 */ private static final Log LOGGER = LogFactory.getLog(SocketClientRequestThread.class); private CountDownLatch countDownLatch; /** * 这个线层的编号 * @param countDownLatch */ private Integer clientIndex; /** * countDownLatch是java提供的同步计数器。 * 当计数器数值减为0时,全部受其影响而等待的线程将会被激活。这样保证模拟并发请求的真实性 * @param countDownLatch */ public SocketClientRequestThread(CountDownLatch countDownLatch , Integer clientIndex) { this.countDownLatch = countDownLatch; this.clientIndex = clientIndex; } @Override public void run() { Socket socket = null; OutputStream clientRequest = null; InputStream clientResponse = null; try { socket = new Socket("localhost",83); clientRequest = socket.getOutputStream(); clientResponse = socket.getInputStream(); //等待,直到SocketClientDaemon完成全部线程的启动,而后全部线程一块儿发送请求 this.countDownLatch.await(); //发送请求信息 clientRequest.write(("这是第" + this.clientIndex + " 个客户端的请求。").getBytes()); clientRequest.flush(); //在这里等待,直到服务器返回信息 SocketClientRequestThread.LOGGER.info("第" + this.clientIndex + "个客户端的请求发送完成,等待服务器返回信息"); int maxLen = 1024; byte[] contextBytes = new byte[maxLen]; int realLen; String message = ""; //程序执行到这里,会一直等待服务器返回信息(注意,前提是in和out都不能close,若是close了就收不到服务器的反馈了) while((realLen = clientResponse.read(contextBytes, 0, maxLen)) != -1) { message += new String(contextBytes , 0 , realLen); } SocketClientRequestThread.LOGGER.info("接收到来自服务器的信息:" + message); } catch (Exception e) { SocketClientRequestThread.LOGGER.error(e.getMessage(), e); } finally { try { if(clientRequest != null) { clientRequest.close(); } if(clientResponse != null) { clientResponse.close(); } } catch (IOException e) { SocketClientRequestThread.LOGGER.error(e.getMessage(), e); } } } }
package testBSocket; import java.io.InputStream; import java.io.OutputStream; import java.net.ServerSocket; import java.net.Socket; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.log4j.BasicConfigurator; public class SocketServer1 { static { BasicConfigurator.configure(); } /** * 日志 */ private static final Log LOGGER = LogFactory.getLog(SocketServer1.class); public static void main(String[] args) throws Exception{ ServerSocket serverSocket = new ServerSocket(83); try { while(true) { Socket socket = serverSocket.accept(); //下面咱们收取信息 InputStream in = socket.getInputStream(); OutputStream out = socket.getOutputStream(); Integer sourcePort = socket.getPort(); int maxLen = 2048; byte[] contextBytes = new byte[maxLen]; //这里也会被阻塞,直到有数据准备好 int realLen = in.read(contextBytes, 0, maxLen); //读取信息 String message = new String(contextBytes , 0 , realLen); //下面打印信息 SocketServer1.LOGGER.info("服务器收到来自于端口:" + sourcePort + "的信息:" + message); //下面开始发送信息 out.write("回发响应信息!".getBytes()); //关闭 out.close(); in.close(); socket.close(); } } catch(Exception e) { SocketServer1.LOGGER.error(e.getMessage(), e); } finally { if(serverSocket != null) { serverSocket.close(); } } } }
客户端代码和上文同样,最主要是更改服务器端的代码:
package testBSocket; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.net.ServerSocket; import java.net.Socket; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.log4j.BasicConfigurator; public class SocketServer2 { static { BasicConfigurator.configure(); } private static final Log LOGGER = LogFactory.getLog(SocketServer2.class); public static void main(String[] args) throws Exception{ ServerSocket serverSocket = new ServerSocket(83); try { while(true) { Socket socket = serverSocket.accept(); //固然业务处理过程能够交给一个线程(这里可使用线程池),而且线程的建立是很耗资源的。 //最终改变不了.accept()只能一个一个接受socket的状况,而且被阻塞的状况 SocketServerThread socketServerThread = new SocketServerThread(socket); new Thread(socketServerThread).start(); } } catch(Exception e) { SocketServer2.LOGGER.error(e.getMessage(), e); } finally { if(serverSocket != null) { serverSocket.close(); } } } } /** * 固然,接收到客户端的socket后,业务的处理过程能够交给一个线程来作。 * 但仍是改变不了socket被一个一个的作accept()的状况。 * @author yinwenjie */ class SocketServerThread implements Runnable { /** * 日志 */ private static final Log LOGGER = LogFactory.getLog(SocketServerThread.class); private Socket socket; public SocketServerThread (Socket socket) { this.socket = socket; } @Override public void run() { InputStream in = null; OutputStream out = null; try { //下面咱们收取信息 in = socket.getInputStream(); out = socket.getOutputStream(); Integer sourcePort = socket.getPort(); int maxLen = 1024; byte[] contextBytes = new byte[maxLen]; //使用线程,一样没法解决read方法的阻塞问题, //也就是说read方法处一样会被阻塞,直到操做系统有数据准备好 int realLen = in.read(contextBytes, 0, maxLen); //读取信息 String message = new String(contextBytes , 0 , realLen); //下面打印信息 SocketServerThread.LOGGER.info("服务器收到来自于端口:" + sourcePort + "的信息:" + message); //下面开始发送信息 out.write("回发响应信息!".getBytes()); } catch(Exception e) { SocketServerThread.LOGGER.error(e.getMessage(), e); } finally { //试图关闭 try { if(in != null) { in.close(); } if(out != null) { out.close(); } if(this.socket != null) { this.socket.close(); } } catch (IOException e) { SocketServerThread.LOGGER.error(e.getMessage(), e); } } } }
我相信服务器使用单线程的效果就不用看了,咱们主要看一看服务器使用多线程处理时的状况:
那么重点的问题并非“是否使用了多线程”,而是为何accept()、read()方法会被阻塞。即:异步IO模式 就是为了解决这样的并发性存在的。可是为了说清楚异步IO模式,在介绍IO模式的时候,咱们就要首先了解清楚,什么是 阻塞式同步、非阻塞式同步、多路复用同步模式。
这里我要特别说明一下,在一篇网文《Java NIO与IO的详细区别(通俗篇)》中, 做者主要讲到了本身对非阻塞方式下硬盘操做的理解。按照个人见解,只要有IO的存在,就会有阻塞或非阻塞的问题,不管这个IO是网络的,仍是硬盘的。这就 是为何基本的JAVA NIO框架中会有FileChannel(并且FileChannel在操做系统级别是不支持非阻塞模式的)、DatagramChannel和 SocketChannel的缘由。NIO并不仅是为了解决磁盘读写的性能而存在的,它的出现缘由、要解决的问题更为广阔;可是另一个方面,文章做者只是表达本身的思想,没有必要争论得“咬文嚼字”。
API文档中对于 serverSocket.accept() 方法的使用描述:
Listens for a connection to be made to this socket and accepts it. The method blocks until a connection is made.
那么咱们首先来看看为何serverSocket.accept()会被阻塞。这里涉及到阻塞式同步IO的工做原理:
若是操做系统没有发现有套接字从指定的端口X来,那么操做系统就会等待。这样serverSocket.accept()方法就会一直等待。这就是为何accept()方法为何会阻塞:它内部的实现是使用的操做系统级别的同步IO。
阻塞IO 和 非阻塞IO 这两个概念是程序级别的。主要描述的是程序请求操做系统IO操做后,若是IO资源没有准备好,那么程序该如何处理的问题:前者等待;后者继续执行(而且使用线程一直轮询,直到有IO资源准备好了)
同步IO 和 非同步IO,这两个概念是操做系统级别的。主要描述的是操做系统在收到程序请求IO操做后, 若是IO资源没有准备好,该如何相应程序的问题:前者不响应,直到IO资源准备好之后;后者返回一个标记(好让程序和本身知道之后的数据往哪里通知),当 IO资源准备好之后,再用事件机制返回给程序。