在上一篇文章中咱们介绍了Java基本IO,也就是阻塞式IO(BIO),在JDK1.4版本后推出了新的IO系统(NIO),也能够理解为非阻塞IO(Non-Blocking IO)。引用《Java NIO》中的一段话来解释一下NIO出现的缘由:html
操做系统与 Java 基于流的 I/O模型有些不匹配。操做系统要移动的是大块数据(缓冲区),这每每是在硬件直接存储器存取( DMA)的协助下完成的。而 JVM 的 I/O 类喜欢操做小块数据——单个字节、几行文本。结果,操做系统送来整缓冲区的数据, java.io 的流数据类再花大量时间把它们拆成小块,每每拷贝一个小块就要往返于几层对象。操做系统喜欢整卡车地运来数据, java.io 类则喜欢一铲子一铲子地加工数据。有了 NIO,就能够轻松地把一卡车数据备份到您能直接使用的地方( ByteBuffer 对象)。可是Java里的RandomAccessFile类是比较接近操做系统的方式。java
能够看出Java原生的IO模型之因此慢,是由于与操做系统的操做方式不匹配形成的,那么NIO之因此比BIO快主要就是用到了缓冲区相关的技术,接下来慢慢介绍这些技术点。git
下图描述了操做系统中数据是如何从外部存储向运行中的进程内存区域移动的过程:进程使用read()系统调用要求缓冲区被填充满。内核随即向磁盘控制器发出指令,要求其从磁盘读取数据。磁盘控制器经过DMA直接把磁盘上的数据写入缓冲区,这一步不须要CPU的参与。当缓冲区填满时,内核将数据从临时缓冲区拷贝到进程执行read()调用时指定的缓冲区。github
这里须要主要为何要执行系统调用这样一个中间步骤而不是直接DMA到进程的缓冲区,是由于用户空间是没法直接操做硬件的,另外磁盘这种块存储设备操做的是固定大小的数据块,而用户请求的则是非规则大小的数据,内核空间在这里的做用就是分解、重组的做用。数组
Java NIO主要依赖的组件有三个:缓冲区Buffer、通道Channel和选择器Selector。服务器
Buffer家族主要有这么些个成员,根据类名也大概能猜到它们的用处,用的最多的是ByteBuffer,在下面的例子中也会主要用到它。
app
在这里就不仔细讲Buffer类的API了,由于须要用的时候能够去查Java Doc,而以几个经常使用的操做来说述一下怎么使用Buffer。dom
容量(capacity):缓冲区的最大大小socket
上界(limit):缓冲区当前的大小函数
位置(position):下一个要读写的位置,由get()和put()更新
标记(mark):备忘位置,由mark()来指定mark = position,由reset()来指定position=mark
它们之间的大小关系:
0 <= mark <= position <= limit <= capacity
一种最经常使用的方式是:
ByteBuffer buffer = ByteBuffer.allocate(1024);
这种方法是建立一个1024字节大小的缓冲区。也能够用下面这种方式来包装本身建立的字节数组。
byte[] bytes = new byte[1024];
ByteBuffer buffer = ByteBuffer.wrap(bytes);
Buffer在填充完毕后须要传递到一个通道中,这时若是直接读取Buffer,实际上是什么都读不到的。由于Buffer的设计中是有一个指针概念的,指向当前的位置,当一个Buffer填充完毕时指针是指向末尾的,所以在读取时应该将指针指向Buffer的头部,简单的方法就是使用下面这个方法:
Buffer.flip();
flip的实现以下:
public final Buffer flip() { limit = position; position = 0; mark = -1; return this; }
能够看出flip实际上是把当前的limit从capacity变成了position,又把position放到了缓冲区的起点,并取消了mark。
Buffer.clear();
clear的实现以下:
public final Buffer clear() { position = 0; limit = capacity; mark = -1; return this; }
clear函数就是将position放到起点,并重置limiti为capacity,以及取消mark。
Buffer.rewind();
rewind的实现以下:
public final Buffer rewind() { position = 0; mark = -1; return this; }
rewind和flip的区别在于没有改变limit的值。
Buffer.compact()
开始我不是很理解Channel这个东西为何要存在,看了书才慢慢明白,缓冲区为咱们装载了数据,可是数据的写入和读取并不能直接进行read()和write()这样的系统调用,而是JVM为咱们提供了一层对系统调用的封装。而Channel能够用最小的开销来访问操做系统自己的IO服务,这就是为何要有Channel的缘由。
下面来说讲经常使用的几个Channel类及其经常使用的方法。
I/O从广义上能够分为File I/O和Stream I/O,对应到通道来讲就有文件通道和socket通道,具体的说是FileChannle类和SocketChannel、ServerSocketChannel和DatagramChannel类。
它们之间的区别仍是很大的,从继承关系上来看:
public abstract class FileChannel extends AbstractInterruptibleChannel implements SeekableByteChannel, GatheringByteChannel, ScatteringByteChannel
FileChannel主要是继承了可中断接口,而对于socket相关的Channel类都继承AbstractSelectableChannel,这是选择器(Selector)相关的通道,在下一节中具体讲解。
public abstract class SocketChannel extends AbstractSelectableChannel implements ByteChannel, ScatteringByteChannel, GatheringByteChannel, NetworkChannel
FileChannel只能经过工厂方法来实例化,那就是调用RandomAccessFile、FileInputStream和FileOutputStream的getChannel()方法。如:
RandomAccessFile file = new RandomAccessFile("a.txt", "r"); FileChannel fc = file.getChannel();
先看看FileChannel提供的方法句柄:
public abstract int read(ByteBuffer dst) throws IOException;//把通道中数据传到目的缓冲区中,dst是destination的缩写 public abstract int write(ByteBuffer src) throws IOException;//把源缓冲区中的内容写到指定的通道中去
从句柄能够看出FileChannel是既能够读又能够写的,是全双工的。下面这个例子用来展现FileChannel是如何进行读和写的。
public class FileChannelTest { public static void readFile(String path) throws IOException { FileChannel fc = new FileInputStream(path).getChannel(); ByteBuffer buffer = ByteBuffer.allocate(128); StringBuilder sb = new StringBuilder(); while ((fc.read(buffer)) >= 0) { //翻转指针 buffer.flip(); //remaining = limit - position byte[] bytes = new byte[buffer.remaining()]; buffer.get(bytes); String string = new String(bytes, "UTF-8"); sb.append(string); //清空buffer buffer.clear(); } System.out.println(sb.toString()); } public static void writeFile(String path, String string) throws IOException { FileChannel fc = new FileOutputStream(path).getChannel(); ByteBuffer buffer = ByteBuffer.allocate(10); int current = 0; int len = string.getBytes().length; while (current < len) { for (int i=0;i<10;i++) { if (current+i>=len) break; buffer.put(string.getBytes()[current+i]); } current += buffer.position(); buffer.flip(); fc.write(buffer); buffer.clear(); } } public static void main(String[] args) throws IOException { String in = "D:/in.txt"; String out = "D:/out.txt"; readFile(in); writeFile(out, "hello world"); readFile(out); } }
分析一下上面这段代码,在readFile()函数中,经过FileInputStream.getChannel()获得FileChannel对象,并建立ByteBuffer对象,接着利用FileChannel的read方法填充buffer,获得填充完的buffer以后咱们将buffer的当前指针翻转一下接着利用buffer的get方法把数据放到byte数组中,接着就能够读取数据了。
读取文件的整个过程相比原生的I/O方法仍是略显麻烦,可是咱们若是把数据当作一堆煤矿,把ByteBuffer当作装煤的矿车,而FileChannel当作是运煤的矿道,那么上面的过程就演变成了:先打通一条矿道,而后把煤矿装在小车里运出来。形象的记忆更利于理解这个过程。
而writeFile()函数也是相似,为了更好的理解Buffer的属性,我特地将buffer的大小设置为10,为要写入的字符串长度为11个字节。首先仍是经过FileOutputStream.getChannel()方法获得FileChannel对象,并建立一个10字节大小的缓冲区,接着定义一个整型变量current指向要写入的字符串的当前下标,每次向buffer中put10个字节,并更新current,经过buffer.position()方法能够获得buffer被填充以后指针的位置,也就是缓冲区里的字节个数,而后翻转指针,最后经过FileChannel.write(buffer)方法将buffer写入到文件中。
一样考虑一下形象化的过程:咱们首先把煤矿装入小车(buffer.put()),并打开一条通往矿山的矿道(FileOutputStream.getChannel()),接着把煤矿运输进去(FileChannel.write(buffer))。仍是很容易理解的吧!
在另外一篇博客中介绍了阻塞式TCP的使用,接下来会介绍一下非阻塞式的TCP使用。
Socket通道与文件通道有着不一样的特征,最显著的就是能够运行非阻塞模式而且是能够选择的。在2.2.1节中咱们讲到Socket通道都继承自AbstractSelectableChannel类,而文件通道没有,而这个类就是Socket通道拥有非阻塞和可选择特色的关键。下面是SelectableChannel的几个方法句柄:
public abstract boolean isBlocking(); public abstract SelectableChannel configureBlocking(boolean block) throws IOException;
从这两个方法句柄能够看到,设置一个socket通道的非阻塞模式只须要:
socketChannel.configureBlocking(false)
便可。而有条件的选择(readiness selection)是一种能够用来查询通道的机制,该查询能够判断通道是否准备好执行一个目标操做,好比read、write或accept。这个特性是在SelectableChannel类和SelectionKey类中进行了定义。
public static final int OP_READ = 1 << 0; public static final int OP_WRITE = 1 << 2; public static final int OP_CONNECT = 1 << 3; public static final int OP_ACCEPT = 1 << 4;
SelectionKey中的四个常量定义了socket通道的四种状态,而SelectableChannel的register方法正好返回了SelectionKey对象。
public abstract SelectionKey register(Selector sel, int ops, Object att) throws ClosedChannelException;
socket通道与文件通道不一样,并非经过socket.getChannel()来建立对象(尽管socket对象有这个方法),而是经过SocketChannel.open()这样的静态工厂方法去建立对象。每个socket通道有与之关联的一个socket对象,却并非全部的socket对象都有一个关联的通道,若是用传统的方法建立了一个socket对象,则它不会有一个关联的通道而且getChannel()方法老是返回null。
SocketChannel sc = SocketChannel.open(); sc.configureBlocking(false);
这样就建立了一个非阻塞的socket通道。
public abstract class ServerSocketChannel extends AbstractSelectableChannel { public static ServerSocketChannel open( ) throws IOException; public abstract ServerSocket socket( ); public abstract ServerSocket accept( ) throws IOException; public final int validOps( ); }
ServerSocketChannel与SocketChannel和DatagramChannel不一样,它自己是不传输数据的,提供的接口很是简单,若是要进行数据读写,须要经过ServerSocketChannel.socket()方法返回一个与之关联的ServerSocket对象来进行。
ServerSocketChannel ssc = ServerSocketChannel.open(); ServerSocket ss = ssc.socket(); ss.bind(new InetSocketAddress(port));
ServerSocketChannel同ServerSocket同样也有accept()方法,当调用ServerSocket的accept()函数时只能是阻塞式的,而调用ServerSocketChannel的accept()函数却能够是非阻塞式。
下面这个例子展现了ServerSocketChannel的用法:
public class Server { static int port = 20001; public static void main(String[] args) throws IOException, InterruptedException { ServerSocketChannel ssc = ServerSocketChannel.open(); ssc.socket().bind(new InetSocketAddress(port)); ssc.configureBlocking(false); String string = "hello client"; ByteBuffer buffer = ByteBuffer.wrap(string.getBytes()); ByteBuffer in = ByteBuffer.allocate(1024); System.out.println("Server wait for connection..."); while (true) { SocketChannel sc = ssc.accept(); if (sc == null) { TimeUnit.SECONDS.sleep(1); }else { //rewind只是将position调到0,不会改变Limit的值,而flip是将limit调整成position,再把position改为0 System.out.println("Server get a connection..."); sc.read(in); in.flip(); buffer.rewind(); sc.write(buffer); System.out.println("From client:" + new String(in.array(), "UTF-8")); } } } }
选择器实际上是一种多路复用机制在Java语言中的应用,在学习Selector以前有必要学习一下I/O多路复用的概念。
在以前的文章中咱们已经看到对于每一个客户端请求都分配一个线程的设计,或者是利用线程池来处理客户端请求,可是这样的设计对于处理客户端有大量请求的状况都一筹莫展。缘由在于首先线程很是消耗系统资源,其次阻塞式的设计在某一个请求发送的数据很大时会使其余请求等待好久。那么究竟有没有其余方法来解决这个问题呢?早在上世纪80年代在Unix系统中就已经提出select模型来解决这个问题,在以后对select进行优化又提出了poll模型和epoll模型(Linux专有)。
select/poll/epoll其实都是一种多路复用模型,什么是多路复用,开始听见这个名词我也是一脸懵逼,以为好像很高大上很难理解的样子。后面经过看书和看知乎上的形象化描述,慢慢理解了其实多路复用也没有想象着那么难。咱们若是把每一个客户端请求当作一个电路,以下图,那么是否有必要为每条电路都分配一条专有的线路呢?仍是当电流来了进行开关切换?很明显,后者只须要一个开关就能够节省大量的没必要要开销。select模型其实就是这样作的,监控全部的socket请求,当某个socket准备好(read/write/accept)时就进行处。可是如何作到监控全部socket的状态呢,select作的很简单,也许你也想到了,就是去轮询全部socket的状态,这样很明显当socket数量比较大时效率很是低。而且select对于监控的socket数量有限制,默认是1024个。poll模型进行了一些改进,可是并无本质的改变。到了epoll模型,就有了很是大的改观。假象另外一个场景,若是你是一个监考老师,考试结束时要去收卷子,你按照常理一个一个的收着,一旦有一个学生还没写完,因而你就会卡(阻塞)在那,而且整个轮询一遍下来很是慢。因此你想到了吗?让那些已经作完的学生举手告知你他已经作完了,你再过去收一下卷子便可。这样很明显阻塞会大幅度减小。这就是epoll,让那些已经准备好的socket发出通知,而后来处理它。
若是仍是不理解,能够看看知乎上的一些回答。
好了,废话这么多,已是能够理解多路复用是什么了。Java语言直到JDK1.4版本才有多路复用这个概念,很大缘由也是由于没人用Java去写服务器,例如著名的Apache和Nginx都是用C/C++写的。接下来对NIO中多路复用的实现进行介绍。
NIO处理多路复用请求只须要三个组件:可选择的通道(SelectableChannels)、选择器(Selector)和选择键(SelectionKey),他们之间的关系以下图所示:
可选择的通道能够主动注册到一个选择器上,并指定对哪些动做是感兴趣的。这个注册行为会返回一个选择键,选择键封装了该通道和选择器之间的注册关系,包含两个比特集:指示该注册关系所关心的通道操做;通道已经准备好的操做。选择器是核心组件,它管理着注册在其上的通道集合的信息和它们的就绪状态。值得注意的是,通道在注册到一个选择器以前,必须设置为非阻塞模式。缘由在这里。
经过静态工厂方法建立一个选择器。
Selector selector = Selector.open();
这是通道拥有的方法,先看看方法句柄:
public abstract SelectionKey register(Selector sel, int ops, Object att) throws ClosedChannelException; public final SelectionKey register(Selector sel, int ops) throws ClosedChannelException { return register(sel, ops, null); }
值得注意的是第二个参数ops,这个参数表示了该通道感兴趣的操做,全部的操做包括读(read)、写(write)、链接(connect)和接受(accept)。并非全部通道都支持这些操做,好比SocketChannel就没有accept这个操做,由于这是专属于ServerSocketChannel的操做。能够经过调用Channel.validOps()来查询支持的操做。
第三个参数是用来传递一个对象的引用,在调用新生成的选择键的attach()方法时会返回该对象的引用。
选择器的核心功能是选择过程,选择器其实是对select()、poll()等本地系统调用的一个封装。每个选择器会维护三个键集合:已注册的键集合、已选择的键集合和已取消的键集合。经过执行Selector.select()、Selector.select(int timeout)或Selector.selectNow(),选择过程被调用,这时会执行如下步骤:
好不容易才写完上面这段,由于我在看原书的时候看了2-3遍才看懂,过程仍是比较复杂的,我以为是时候去看看Unix中的select()是怎么作的,也许这样更利于理解这个选择过程。
说了这么多原理,不知道你晕没晕,反正我是快晕了。这时候来一段实战代码,告诉你了解了这么多,到底该怎么用!
一般的作法以下:在选择器上调用一次select操做(这会更新已选择键的集合),而后遍历selectedKeys返回的键的集合。接着键将从已选择的键的集合中被移除(经过Iterator.remove()方法),而后检测下一个键。完成后,继续下一次select操做。
服务端程序演示:
public class SelectorTest { public static void main(String[] args) throws IOException { new SelectorTest().select(); } public void select() throws IOException { //建立选择器 Selector selector = Selector.open(); //建立serverChannel ServerSocketChannel ssc = ServerSocketChannel.open(); //设置为非阻塞模式 ssc.configureBlocking(false); //绑定监听的地址 ssc.socket().bind(new InetSocketAddress(20000), 1024); //将serverChannel注册到选择器上,监听accept事件,返回选择键 ssc.register(selector, SelectionKey.OP_ACCEPT); while (true) { //这次选择过程准备就绪的通道数量 int num = selector.select(); if (num == 0) { //若没有准备好的就继续循环 continue; } //返回已就绪的键集合 Iterator<SelectionKey> it = selector.selectedKeys().iterator(); while (it.hasNext()) { SelectionKey key = it.next(); handle(selector, key); //由于已经处理了该键,因此把当前的key从已选择的集合中去除 it.remove(); } } } public void handle(Selector selector, SelectionKey key) throws IOException { if (key.isValid()) { //当一个ServerChannel为accept状态时,注册这个ServerChannel的SocketChannel为可读取状态 if (key.isAcceptable()) { ServerSocketChannel serverChannel = (ServerSocketChannel) key.channel(); SocketChannel channel = serverChannel.accept(); //把通道注册到选择器以前要设置为非阻塞,不然会报异常 channel.configureBlocking(false); channel.register(selector, SelectionKey.OP_READ); } //若是channel是可读取状态,则读取其中的数据 if (key.isReadable()) { //只有SocketChannel才能读写数据,因此若是是可读取状态,只能是SocketChannel SocketChannel sc = (SocketChannel) key.channel(); ByteBuffer in = ByteBuffer.allocate(1024); //将socketChannel中的数据读入到buffer中,返回当前字节的位置 int readBytes = sc.read(in); if (readBytes > 0) { //把buffer的position指针指向buffer的开头 in.flip(); byte[] bytes = new byte[in.remaining()]; in.get(bytes); String body = new String(bytes, "UTF-8"); System.out.println("The server receive : " + body); //把response输出到socket中 doWrite(sc, "Hello client"); } else if (readBytes < 0) { key.cancel(); sc.close(); } } } } private void doWrite(SocketChannel sc, String response) throws IOException { //把服务器端返回的数据写到socketChannel中 if (response == null && response.trim().length() > 0) { byte[] bytes = response.getBytes(); ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length); writeBuffer.put(bytes); writeBuffer.flip(); sc.write(writeBuffer); } } }
代码相较于阻塞式TCP服务端程序复杂了太多倍,可是基本思路跟我上面那段话写的是同样的,并且基本每一段代码都写了注释,耐心看下去确定看的懂。我就再也不解释这段代码啦。
客户端演示:
public class Client { public static final int PORT = 20000; public static final String HOST = "127.0.0.1"; private volatile boolean stop = false; public static void main(String[] args) throws IOException { new Client().select(); } public void select() throws IOException { // 建立选择器 Selector selector = Selector.open(); // 建立SocketChannel SocketChannel sc = SocketChannel.open(); // 设置为非阻塞模式 sc.configureBlocking(false); try { doConnect(selector, sc); } catch (Exception e) { e.printStackTrace(); System.exit(1); } while (!stop) { int num = selector.select(); if (num == 0) { continue; } Iterator<SelectionKey> it = selector.selectedKeys().iterator(); while (it.hasNext()) { SelectionKey key = it.next(); try { handleKeys(selector, key); } catch (Exception e) { e.printStackTrace(); if (key != null) { key.cancel(); if (key.channel() != null) { key.channel().close(); } } } // 由于已经处理了该键,因此把当前的key从已选择的集合中去除 it.remove(); } } if (selector != null) { selector.close(); } } private void doWrite(SocketChannel sc, String response) throws IOException { if (response != null && response.trim().length() > 0) { byte[] bytes = response.getBytes(); ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length); writeBuffer.put(bytes); writeBuffer.flip(); sc.write(writeBuffer); if (!writeBuffer.hasRemaining()) { System.out.println("Send msg successfully"); } } } private void handleKeys(Selector selector, SelectionKey key) throws IOException { if (key.isValid()) { SocketChannel sc = (SocketChannel) key.channel(); // 判断是否链接成功 if (key.isConnectable()) { if (sc.finishConnect()) { sc.register(selector, SelectionKey.OP_READ); doWrite(sc, "Hello Server"); } else { System.exit(1); } } if (key.isReadable()) { ByteBuffer in = ByteBuffer.allocate(1024); // 将socketChannel中的数据读入到buffer中,返回当前字节的位置 int readBytes = sc.read(in); if (readBytes > 0) { // 把buffer的position指针指向buffer的开头 in.flip(); byte[] bytes = new byte[in.remaining()]; in.get(bytes); String body = new String(bytes, "UTF-8"); System.out.println("The Client receive : " + body); this.stop = true; } else if (readBytes < 0) { // 对端链路关闭 key.cancel(); sc.close(); } else { // 读到0字节,忽略 } } } } private void doConnect(Selector selector, SocketChannel sc) throws IOException { if (sc.connect(new InetSocketAddress(HOST, PORT))) { System.out.println("Client connect successfully..."); // 若是直接链接成功,则注册读操做 sc.register(selector, SelectionKey.OP_READ); doWrite(sc, "Hello server!"); } else { // 若是没有链接成功,则注册链接操做 sc.register(selector, SelectionKey.OP_CONNECT); } } }
客户端跟服务端很类似,惟一不一样的是服务端须要监测的socket行为是OP_ACCEPT和OP_READ,而客户端须要监控的是OP_CONNECT和OP_READ,其余的区别不是很大。
依次运行服务器端和客户端,结果以下:
代码在个人github repo上也能够找到。
花了大概三天的时间,把《Java NIO》这本书看了一遍并记录了下来学习过程,而且结合《Netty权威指南》中的例子去实践了一下,慢慢感受到NIO的魅力。反思一下学习的比较慢的缘由,应该是对Unix上的I/O模型不熟悉致使的,因此以为接下来好好学习一下select、poll、epoll,加深对多路复用的理解。
本文中可能存在理解有误差的地方,也请多多指正。