<font color="#333333" face="PingFangSC, helvetica neue, hiragino sans gb, arial, microsoft yahei ui, microsoft yahei, simsun, sans-serif">微信公众号【黄小斜】做者是蚂蚁金服 JAVA 工程师,专一于 JAVA 后端技术栈:SpringBoot、SSM全家桶、MySQL、分布式、中间件、微服务,同时也懂点投资理财,坚持学习和写做,相信终身学习的力量!关注公众号后回复”架构师“便可领取 Java基础、进阶、项目和架构师等免费学习资料,更有数据库、分布式、微服务等热门技术学习视频,内容丰富,兼顾原理和实践,另外也将赠送做者原创的Java学习指南、Java程序员面试指南等干货资源。</font>
git 地址:https://github.com/jasonGeng88/java-network-programmingjava
接着上一篇中的站点访问问题,若是咱们须要并发访问10个不一样的网站,咱们该如何处理?git
在上一篇中,咱们使用了java.net.socket
类来实现了这样的需求,以一线程处理一链接的方式,并配以线程池的控制,貌似获得了当前的最优解。但是这里也存在一个问题,链接处理是同步的,也就是并发数量增大后,大量请求会在队列中等待,或直接异常抛出。程序员
为解决这问题,咱们发现元凶处在“一线程一请求”上,若是一个线程能同时处理多个请求,那么在高并发下性能上会大大改善。这里就借住 JAVA 中的 nio 技术来实现这一模型。github
关于什么是 nio,从字面上理解为 New IO,就是为了弥补本来 I/O 上的不足,而在 JDK 1.4 中引入的一种新的 I/O 实现方式。简单理解,就是它提供了 I/O 的阻塞与非阻塞的两种实现方式(_固然,默认实现方式是阻塞的。_)。面试
下面,咱们先来看下 nio 以阻塞方式是如何处理的。数据库
有了上一篇 socket 的经验,咱们的第一步必定也是创建 socket 链接。只不过,这里不是采用 new socket()
的方式,而是引入了一个新的概念 SocketChannel
。它能够看做是 socket 的一个完善类,除了提供 Socket 的相关功能外,还提供了许多其余特性,如后面要讲到的向选择器注册的功能。后端
创建链接代码实现:微信
<pre>// 初始化 socket,创建 socket 与 channel 的绑定关系
SocketChannel socketChannel = SocketChannel.open();
// 初始化远程链接地址
SocketAddress remote = new InetSocketAddress(this.host, port);
// I/O 处理设置阻塞,这也是默认的方式,可不设置
socketChannel.configureBlocking(true);
// 创建链接
socketChannel.connect(remote);</pre>数据结构
由于是一样是 I/O 阻塞的实现,因此后面的关于 socket 输入输出流的处理,和上一篇的基本相同。惟一差异是,这里须要经过 channel 来获取 socket 链接。
<pre>Socket socket = socketChannel.socket();</pre>
<pre>PrintWriter pw = getWriter(socketChannel.socket());
BufferedReader br = getReader(socketChannel.socket());</pre>
<pre>package com.jason.network.mode.nio;
import com.jason.network.constant.HttpConstant;
import com.jason.network.util.HttpUtil;
import java.io.*;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.SocketAddress;
import java.nio.channels.SocketChannel;
public class NioBlockingHttpClient {
private SocketChannel socketChannel; private String host; public static void main(String[] args) throws IOException { for (String host: HttpConstant.HOSTS) { NioBlockingHttpClient client = new NioBlockingHttpClient(host, HttpConstant.PORT); client.request(); } } public NioBlockingHttpClient(String host, int port) throws IOException { this.host = host; socketChannel = SocketChannel.open(); socketChannel.socket().setSoTimeout(5000); SocketAddress remote = new InetSocketAddress(this.host, port); this.socketChannel.connect(remote); } public void request() throws IOException { PrintWriter pw = getWriter(socketChannel.socket()); BufferedReader br = getReader(socketChannel.socket()); pw.write(HttpUtil.compositeRequest(host)); pw.flush(); String msg; while ((msg = br.readLine()) != null){ System.out.println(msg); } } private PrintWriter getWriter(Socket socket) throws IOException { OutputStream out = socket.getOutputStream(); return new PrintWriter(out); } private BufferedReader getReader(Socket socket) throws IOException { InputStream in = socket.getInputStream(); return new BufferedReader(new InputStreamReader(in)); }
}</pre>
nio 的阻塞实现,基本与使用原生的 socket 相似,没有什么特别大的差异。
下面咱们来看看它真正强大的地方。到目前为止,咱们将的都是阻塞 I/O。何为阻塞 I/O,看下图:
咱们主要观察图中的前三种 I/O 模型,关于异步 I/O,通常须要依靠操做系统的支持,这里不讨论。
从图中能够发现,阻塞过程主要发生在两个阶段上:
这里产生了一个从内核到用户空间的拷贝,主要是为了系统的性能优化考虑。假设,从网卡读到的数据直接返回给用户空间,那势必会形成频繁的系统中断,由于从网卡读到的数据不必定是完整的,可能断断续续的过来。经过内核缓冲区做为缓冲,等待缓冲区有足够的数据,或者读取完结后,进行一次的系统中断,将数据返回给用户,这样就能避免频繁的中断产生。
了解了 I/O 阻塞的两个阶段,下面咱们进入正题。看看一个线程是如何实现同时处理多个 I/O 调用的。从上图中的非阻塞 I/O 能够看出,仅仅只有第二阶段须要阻塞,第一阶段的数据等待过程,咱们是不须要关心的。不过该模型是频繁地去检查是否就绪,形成了 CPU 无效的处理,反而效果很差。若是有一种相似的好莱坞原则— “不要给咱们打电话,咱们会打给你” 。这样一个线程能够同时发起多个 I/O 调用,而且不须要同步等待数据就绪。在数据就绪完成的时候,会以事件的机制,来通知咱们。这样不就实现了单线程同时处理多个 IO 调用的问题了吗?即所说的“I/O 多路复用模型”。
废话讲了一大堆,下面就来实际操刀一下。
由上面分析能够,咱们得有一个选择器,它能监听全部的 I/O 操做,而且以事件的方式通知咱们哪些 I/O 已经就绪了。
代码以下:
<pre>import java.nio.channels.Selector;
...
private static Selector selector;
static {
try { selector = Selector.open(); } catch (IOException e) { e.printStackTrace(); }
}
</pre>
下面,咱们来建立一个非阻塞的 SocketChannel
,代码与阻塞实现类型,惟一不一样是socketChannel.configureBlocking(false)
。
注意:只有在socketChannel.configureBlocking(false)
以后的代码,才是非阻塞的,若是socketChannel.connect()
在设置非阻塞模式以前,那么链接操做依旧是阻塞调用的。
<pre>SocketChannel socketChannel = SocketChannel.open();
SocketAddress remote = new InetSocketAddress(host, port);
// 设置非阻塞模式
socketChannel.configureBlocking(false);
socketChannel.connect(remote);</pre>
选择器与 socket 都建立好了,下一步就是将二者进行关联,好让选择器和监听到 Socket 的变化。这里采用了以 SocketChannel
主动注册到选择器的方式进行关联绑定,这也就解释了,为何不直接new Socket()
,而是以SocketChannel
的方式来建立 socket。
代码以下:
<pre>socketChannel.register(selector,
SelectionKey.OP_CONNECT | SelectionKey.OP_READ | SelectionKey.OP_WRITE);</pre>
上面代码,咱们将 socketChannel 注册到了选择器中,而且对它的链接、可读、可写事件进行了监听。
具体的事件监听类型以下:
操做类型 | 值 | 描述 | 所属对象 |
---|---|---|---|
OP_READ | 1 << 0 | 读操做 | SocketChannel |
OP_WRITE | 1 << 2 | 写操做 | SocketChannel |
OP_CONNECT | 1 << 3 | 链接socket操做 | SocketChannel |
OP_ACCEPT | 1 << 4 | 接受socket操做 | ServerSocketChannel |
如今,选择器已经与咱们关心的 socket 进行了关联。下面就是感知事件的变化,而后调用相应的处理机制。
这里与 Linux 下的 selector 有点不一样,nio 下的 selecotr 不会去遍历全部关联的 socket。咱们在注册时设置了咱们关心的事件类型,每次从选择器中获取的,只会是那些符合事件类型,而且完成就绪操做的 socket,减小了大量无效的遍历操做。
public void select() throws IOException { // 获取就绪的 socket 个数 while (selector.select() > 0){ // 获取符合的 socket 在选择器中对应的事件句柄 key Set keys = selector.selectedKeys(); // 遍历全部的key Iterator it = keys.iterator(); while (it.hasNext()){ // 获取对应的 key,并从已选择的集合中移除 SelectionKey key = (SelectionKey)it.next(); it.remove(); if (key.isConnectable()){ // 进行链接操做 connect(key); } else if (key.isWritable()){ // 进行写操做 write(key); } else if (key.isReadable()){ // 进行读操做 receive(key); } } } }
注意:这里的selector.select()
是同步阻塞的,等待有事件发生后,才会被唤醒。这就防止了 CPU 空转的产生。固然,咱们也能够给它设置超时时间,selector.select(long timeout)
来结束阻塞过程。
下面,咱们分别来看下,一个 socket 是如何来处理链接、写入数据和读取数据的(_这些操做都是阻塞的过程,只是咱们将等待就绪的过程变成了非阻塞的了_)。
处理链接代码:
<pre>// SelectionKey 表明 SocketChannel 在选择器中注册的事件句柄
private void connect(SelectionKey key) throws IOException {
// 获取事件句柄对应的 SocketChannel SocketChannel channel = (SocketChannel) key.channel();
// 真正的完成 socket 链接
channel.finishConnect();
// 打印链接信息
InetSocketAddress remote = (InetSocketAddress) channel.socket().getRemoteSocketAddress(); String host = remote.getHostName(); int port = remote.getPort(); System.out.println(String.format("访问地址: %s:%s 链接成功!", host, port));
}</pre>
<pre>// 字符集处理类
private Charset charset = Charset.forName("utf8");
private void write(SelectionKey key) throws IOException {
SocketChannel channel = (SocketChannel) key.channel(); InetSocketAddress remote = (InetSocketAddress) channel.socket().getRemoteSocketAddress(); String host = remote.getHostName(); // 获取 HTTP 请求,同上一篇 String request = HttpUtil.compositeRequest(host); // 向 SocketChannel 写入事件 channel.write(charset.encode(request)); // 修改 SocketChannel 所关心的事件 key.interestOps(SelectionKey.OP_READ);
}</pre>
这里有两个地方须要注意:
channel.write(charset.encode(request));
进行数据写入。有人会说,为何不能像上面同步阻塞那样,经过PrintWriter
包装类进行操做。由于PrintWriter
的 write()
方法是阻塞的,也就是说要等数据真正从 socket 发送出去后才返回。这与咱们这里所讲的阻塞是不一致的,这里的操做虽然也是阻塞的,但它发生的过程是在数据从用户空间到内核缓冲区拷贝过程。至于系统将缓冲区的数据经过 socket 发送出去,这不在阻塞范围内。也解释了为何要用 Charset
对写入内容进行编码了,由于缓冲区接收的格式是ByteBuffer
。
第二,选择器用来监听事件变化的两个参数是 interestOps
与 readyOps
。
SocketChannel
所关心的事件类型,也就是告诉选择器,当有这几种事件发生时,才来通知我。这里经过key.interestOps(SelectionKey.OP_READ);
告诉选择器,以后我只关心“读就绪”事件,其余的不用通知我了。SocketChannel
当前就绪的事件类型。以key.isReadable()
为例,判断依据就是:return (readyOps() & OP_READ) != 0;
<pre>private void receive(SelectionKey key) throws IOException {
SocketChannel channel = (SocketChannel) key.channel(); ByteBuffer buffer = ByteBuffer.allocate(1024); channel.read(buffer); buffer.flip(); String receiveData = charset.decode(buffer).toString(); // 当再没有数据可读时,取消在选择器中的关联,并关闭 socket 链接 if ("".equals(receiveData)) { key.cancel(); channel.close(); return; } System.out.println(receiveData);
}</pre>
这里的处理基本与写入一致,惟一要注意的是,这里咱们须要自行处理去缓冲区读取数据的操做。首先会分配一个固定大小的缓冲区,而后从内核缓冲区中,拷贝数据至咱们刚分配固定缓冲区上。这里存在两种状况:
SocketChannel
处理读就绪状态,那下一次会继续读取。固然,分配太小,会增长遍历次数。最后,将一下 ByteBuffer
的结构,它主要有 position, limit,capacity 以及 mark 属性。以 buffer.flip();
为例,讲下各属性的做用(_mark 主要是用来标记以前 position 的位置,是在当前 postion 没法知足的状况下使用的,这里不做讨论_)。
从图中看出,
<pre>package com.jason.network.mode.nio;
import com.jason.network.constant.HttpConstant;
import com.jason.network.util.HttpUtil;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.util.Iterator;
import java.util.Set;
public class NioNonBlockingHttpClient {
private static Selector selector; private Charset charset = Charset.forName("utf8"); static { try { selector = Selector.open(); } catch (IOException e) { e.printStackTrace(); } } public static void main(String[] args) throws IOException { NioNonBlockingHttpClient client = new NioNonBlockingHttpClient(); for (String host: HttpConstant.HOSTS) { client.request(host, HttpConstant.PORT); } client.select(); } public void request(String host, int port) throws IOException { SocketChannel socketChannel = SocketChannel.open(); socketChannel.socket().setSoTimeout(5000); SocketAddress remote = new InetSocketAddress(host, port); socketChannel.configureBlocking(false); socketChannel.connect(remote); socketChannel.register(selector, SelectionKey.OP_CONNECT | SelectionKey.OP_READ | SelectionKey.OP_WRITE); } public void select() throws IOException { while (selector.select(500) > 0){ Set keys = selector.selectedKeys(); Iterator it = keys.iterator(); while (it.hasNext()){ SelectionKey key = (SelectionKey)it.next(); it.remove(); if (key.isConnectable()){ connect(key); } else if (key.isWritable()){ write(key); } else if (key.isReadable()){ receive(key); } } } } private void connect(SelectionKey key) throws IOException { SocketChannel channel = (SocketChannel) key.channel(); channel.finishConnect(); InetSocketAddress remote = (InetSocketAddress) channel.socket().getRemoteSocketAddress(); String host = remote.getHostName(); int port = remote.getPort(); System.out.println(String.format("访问地址: %s:%s 链接成功!", host, port)); } private void write(SelectionKey key) throws IOException { SocketChannel channel = (SocketChannel) key.channel(); InetSocketAddress remote = (InetSocketAddress) channel.socket().getRemoteSocketAddress(); String host = remote.getHostName(); String request = HttpUtil.compositeRequest(host); System.out.println(request); channel.write(charset.encode(request)); key.interestOps(SelectionKey.OP_READ); } private void receive(SelectionKey key) throws IOException { SocketChannel channel = (SocketChannel) key.channel(); ByteBuffer buffer = ByteBuffer.allocate(1024); channel.read(buffer); buffer.flip(); String receiveData = charset.decode(buffer).toString(); if ("".equals(receiveData)) { key.cancel(); channel.close(); return; } System.out.println(receiveData); }
}
</pre>
本文从 nio 的阻塞方式讲起,介绍了阻塞 I/O 与非阻塞 I/O 的区别,以及在 nio 下是如何一步步构建一个 IO 多路复用的模型的客户端。文中须要理解的内容比较多,若是有理解错误的地方,欢迎指正~
<pre>public static void main(String[] args) {
基于线程池的伪异步NIO模型 a = new 基于线程池的伪异步NIO模型();
a.startServer(); }
private Charset charset = Charset.forName("utf8"); class WriteThread implements Runnable {
private SelectionKey key;
public WriteThread(SelectionKey key) {
this.key = key;
}
@Override
public void run() {
SocketChannel socketChannel = (SocketChannel) key.channel();
Socket socket = socketChannel.socket();
try {
socketChannel.finishConnect();
} catch (IOException e) {
e.printStackTrace();
}
InetSocketAddress remote = (InetSocketAddress) socketChannel.socket().getRemoteSocketAddress();
String host = remote.getHostName();
int port = remote.getPort();
System._out_.println(String.format("访问地址: %s:%s 链接成功!", host, port)); }
}
class ReadThread implements Runnable {
private SelectionKey key;
public ReadThread(SelectionKey key) {
this.key = key;
}
@Override
public void run() {
SocketChannel socketChannel = (SocketChannel) key.channel();
ByteBuffer buffer = ByteBuffer.allocate(1024);
try {
socketChannel.read(buffer);
} catch (IOException e) {
e.printStackTrace();
}
buffer.flip();
String receiveData = null;
try {
receiveData = new String(buffer.array(), "utf8");
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
if ("".equals(receiveData)) { key.cancel();
try {
socketChannel.close();
} catch (IOException e) {
e.printStackTrace();
}
return;
}
System._out_.println(receiveData);
}
}
class ConnectThread implements Runnable {
private SelectionKey key;
public ConnectThread(SelectionKey key) {
this.key = key;
}
@Override
public void run() {
SocketChannel socketChannel = (SocketChannel) key.channel();
ByteBuffer byteBuffer = charset.encode("hello world");
try {
socketChannel.write(byteBuffer);
System._out_.println("hello world");
} catch (IOException e) {
e.printStackTrace();
}
key.interestOps(SelectionKey._OP_READ_);
}
}
public void startServer() {
ExecutorService executorService = Executors.newFixedThreadPool(10);
try {
SocketChannel socketChannel = SocketChannel.open();
Selector selector = Selector.open(); socketChannel.configureBlocking(false);
InetSocketAddress inetAddress = new InetSocketAddress(1234); socketChannel.connect(inetAddress);
socketChannel.register(selector, SelectionKey._OP_CONNECT_ |
SelectionKey._OP_READ_ | SelectionKey._OP_WRITE_); while (selector.select(500) > 0) { Iterator<SelectionKey> keys = selector.selectedKeys().iterator();
while (keys.hasNext()) {
SelectionKey key = keys.next();
if (key.isConnectable()) {
executorService.submit(new ConnectThread(key));
}else if(key.isReadable()) {
executorService.submit(new ReadThread(key));
}else {
executorService.submit(new WriteThread(key));
}
} } } catch (IOException e) { e.printStackTrace();
}
}</pre>
<pre>class NioNonBlockingHttpServer {
private static Selector _selector_;
private Charset charset = Charset.forName("utf8"); static {
try { _selector_ = Selector.open();
} catch (IOException e) {
e.printStackTrace();
}
} public static void main(String[] args) throws IOException { NioNonBlockingHttpServer httpServer = new NioNonBlockingHttpServer();
httpServer.select(); }
public void request(int port) throws IOException { ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.socket().setSoTimeout(5000);
serverSocketChannel.configureBlocking(false);
serverSocketChannel.socket().bind(new InetSocketAddress(8383)); // serverSocketChannel.register(selector, // SelectionKey.OP_CONNECT // | SelectionKey.OP_READ // | SelectionKey.OP_WRITE);
}
public void select() throws IOException { while (_selector_.select(500) > 0) { Set keys = _selector_.selectedKeys(); Iterator it = keys.iterator(); while (it.hasNext()) { SelectionKey key = (SelectionKey) it.next();
it.remove(); if (key.isAcceptable()) {
accept(key);
} else if (key.isWritable()) {
write(key);
} else if (key.isReadable()) {
receive(key);
}
} } } private void accept(SelectionKey key) throws IOException { SocketChannel socketChannel;
ServerSocketChannel channel = (ServerSocketChannel) key.channel();
socketChannel = channel.accept();//接受链接请求
socketChannel.configureBlocking(false); socketChannel.register(_selector_, SelectionKey._OP_READ_ | SelectionKey._OP_WRITE_); InetSocketAddress local = (InetSocketAddress) channel.socket().getLocalSocketAddress();
String host = local.getHostName();
int port = local.getPort();
System._out_.println(String.format("请求地址: %s:%s 接收成功!", host, port)); }
private void write(SelectionKey key) throws IOException { SocketChannel channel = (SocketChannel) key.channel(); InetSocketAddress local = (InetSocketAddress) channel.socket().getRemoteSocketAddress();
String host = local.getHostName();
String msg = "hello Client";
channel.write(charset.encode(msg)); System._out_.println(msg);
key.interestOps(SelectionKey._OP_READ_);
}
private void receive(SelectionKey key) throws IOException { SocketChannel channel = (SocketChannel) key.channel();
ByteBuffer buffer = ByteBuffer.allocate(1024);
channel.read(buffer);
buffer.flip();
String receiveData = charset.decode(buffer).toString(); if ("".equals(receiveData)) {
key.cancel();
channel.close();
return; }
System._out_.println(receiveData);
}}</pre>