Java IO编程全解(四)——NIO编程

  转载请注明出处:http://www.cnblogs.com/Joanna-Yan/p/7793964.html html

  前面讲到:Java IO编程全解(三)——伪异步IO编程java

  NIO,即New I/O,这是官方叫法,由于它相对于以前的I/O类库是新增的。可是,因为以前老的I/O类库是阻塞I/O,New I/O类库的目标就是要让Java支持非阻塞I/O,因此,更多的人喜欢称之为非阻塞I/O(Non-block I/O),因为非阻塞I/O更可以体现NIO的特色,因此这里使用的NIO都是指非阻塞I/O。编程

  与Socket类和ServerSocket类相对应,NIO也提供了SocketChannel和ServerSocketChannel两种不一样的套接字通道实现。这两种新增的通道都支持阻塞和非阻塞两种模式。阻塞模式使用很是简单,可是性能和可靠性都很差,非阻塞则正好相反。开发人员通常能够根据本身的须要来选择合适的模式,通常来讲,低负载、低并发的应用程序能够选择同步阻塞I/O以下降编程复杂度,可是对于高负载、高并发的网络应用,须要使用NIO的非阻塞模式进行开发。数组

1.NIO类库简介

  新的输入/输出(NIO)库是在JDK1.4中引入的。NIO弥补了原来同步阻塞I/O的不足,它在标准Java代码中提供了高速的、面向块的I/O。经过定义包含数据的类,以及经过以块的形式处理这些数据,NIO不使用本机代码就能够利用低级优化,这是原来的I/O包所没法作到的。下面对NIO的一些概念和功能作下简单介绍,以便你们可以快速地了解NIO类库和相关概念。服务器

  1.缓冲区Buffer微信

  Buffer是一个对象,它包含一些要写入或者要读出的数据。在NIO类库中加入Buffer对象,体现了新库与原I/O的一个重要区别。在面向流的I/O中,能够将数据直接写入或者将数据直接读到Stream对象中。网络

  在NIO库中,全部数据都是用缓冲区处理的。在读取数据时,它是直接读到缓冲区中的;在写入数据时,写入到缓冲区中。任什么时候候访问NIO中的数据,都是经过缓冲区进行操做。并发

  缓冲区实质上是一个数组。一般它是一个字节数组(ByteBuffer),也可使用其余种类的数组。可是缓冲区不只仅是一个数组,缓冲区提供了对数据的结构化访问以及维护读写位置(limit)等信息。异步

  最经常使用的缓冲区是ByteBuffer,一个ByteBuffer提供了一组功能用于操做byte数组。除了ByteBuffer,还有其余的一些缓冲区,事实上,每一种Java基本类型(除了Boolean类型)都对应有一种缓冲区,具体以下:socket

  • ByteBuffer:字节缓冲区
  • CharBuffer:字符缓冲区
  • ShortBuffer:短整型缓冲区
  • IntBuffer:整型缓冲区
  • LongBuffer:长整型缓冲区
  • FloatBuffer:浮点型缓冲区
  • DoubleBuffer:双精度浮点型缓冲区

   每个Buffer类都是Buffer接口的一个子实例。除了ByteBuffer,每个Buffer类都有彻底同样的操做,只是它们所处理的数据类型不同。由于大多数标准I/O操做都是使用ByteBuffer,因此它除了具备通常缓冲区的操做以外还提供一些特有的操做,方便网络读写。

  2.通道Channel

  Channel是一个通道,能够经过它读取和写入数据,它就像自来水管同样,网络数据经过Channel读取和写入。通道与流的不一样之处在于通道是双向的,流只是在一个方向上移动(一个流必须是InputStream或者OutputStream的子类),并且通道能够用于读、写或者同时读写。由于Channel是全双工的,因此它能够比流更好地映射底层操做系统的API。

  3.多路复用器Selector

  多路复用器Selector是Java NIO编程的基础,熟练地掌握Selector对于掌握NIO编程相当重要。多路复用器提供选择已经就绪的任务的能力。简单来说,Selector会不断地轮询注册在其上的Channel,若是某个Channel上面有新的TCP链接接入、读和写事件,这个Channel就处于就绪状态,会被Selector轮询出来,而后经过SelectionKey能够获取就绪Channel的集合,进行后续的I/O操做。

  一个多路复用器Selector能够同时轮询多个Channel,因为JDK使用了epoll()代替传统的select实现,因此它并无最大链接句柄1024/2048的限制。这也就意味着只须要一个线程负责Selector的轮询,就能够接入成千上万的客户端,这确实是个很是巨大的进步。

2.NIO服务端序列图

  NIO服务端通讯序列图以下图所示:

  下面,咱们对NIO服务端的主要建立过程进行讲解和说明,做为NIO的基础入门,咱们将忽略掉一些在生产环境中部署所须要的一些特性和功能。

  步骤一:打开ServerSocketChannel,用于监听客户端的链接,它是全部客户端链接的父管道。

ServerSocketChannel acceptorSvr=ServerSocketChannel.open();

  步骤二:绑定监听端口,设置链接为非阻塞模式。

acceptorSvr.socket().bind(new InetSocketAddress(InetAddress.getByName("IP"),port));
acceptorSvr.configureBlocking(false);

  步骤三:建立Reactor线程,建立多路复用器并启动线程。

Selector selector=Selector.open();
New Thread(new ReactorTask()).start();

  步骤四:将ServerSocketChannel注册到Reactor线程的多路复用器Selector上,监听ACCEPT事件。

SelectionKey key=acceptorSvr.register(selector,SelectionKey.OP_ACCEPT,ioHandler);

  步骤五:多路复用器在线程run方法的无线循环体内轮询准备就绪的Key。

int num=selector.select();
Set selectedKeys=selector.selectedKeys();
Iterator it=selectedKeys.iterator();
while(it.hasNext()){
  SelectionKey key=(SelectionKey )it.next();
  //...deal with I/O event...
}

  步骤六:多路复用器监听到有新的客户端接入,处理新的接入请求,完成TCP三次握手,创建物理链路。

SocketChannel channel=svrChannel.accpet();

  步骤七:设置客户端链路为非阻塞模式。

channel.configureBlocking(false);
channel.socket().setReuseAddress(true);
......

  步骤八:将新接入的客户端链接注册到Reactor线程的多路复用器,监听读操做,用来读取客户端发送的网络消息。

SelectionKey key=socketChannel.register(selector,SelectionKey.OP_READ,ioHandler);

  步骤九:异步读取客户端请求消息到缓冲区。

int readNumber=channel.read(receivedBuffer);

  步骤十:对ByteBuffer进行编解码,若是有半包消息指针reset,继续读取后续的报文,将解码成功的消息封装成Task,投递到业务线程池中,进行业务逻辑编排。

Object message=null;
while(buffer.hasRemain()){
  byteBuffer.mark();
  Object message=decode(byteBuffer);
  if(message==null){
    byteBuffer.reset();
    break;
  }
  messageList.add(message);
}
if(!byteBuffer.hasRemain()){
  byteBuffer.clear();
}else{
  byteBuffer.compact();
}
if(messageList!=null& !messageList.isEmpty()){
  for(Object messageE: messageList){
    handlerTask(messageE);
  }
}

  步骤十一:将POJO对象encode成ByteBuffer,调用SocketChannel的异步write接口,将消息异步发送给客户端。

socketChannel.write(buffer);

  注意:若是发送区TCP缓冲区满,会致使写半包,此时,须要注册监听写操做位,循环写,直到整包消息写入TCP缓冲区。

  当咱们了解建立NIO服务端的基本步骤以后,下面咱们将前面的时间服务器程序经过NIO重写一遍,让你们可以学习到完整版的NIO服务端建立。

3.NIO建立的TimeServer源码分析

package joanna.yan.nio;

public class TimeServer {

    public static void main(String[] args) {
        int port=9090;
        if(args!=null&&args.length>0){
            try {
                port=Integer.valueOf(args[0]);
            } catch (Exception e) {
                // 采用默认值
            }
        }
        
        MultiplexerTimeServer timeServer=new MultiplexerTimeServer(port);
        new Thread(timeServer, "NIO-MultiplexerTimeServer-001").start();
    }
}
package joanna.yan.nio;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Date;
import java.util.Iterator;
import java.util.Set;
/**
 * 多路复用类
 * 它是一个独立的线程,负责轮询多路复器Selector,能够处理多个客户端的并发接入。
 * @author Joanna.Yan
 * @date 2017年11月6日下午3:51:41
 */
public class MultiplexerTimeServer implements Runnable{

        private Selector selector;//多路复用器
        private ServerSocketChannel servChannel;
        private volatile boolean stop;
        
        /**
         * 初始化多路复用器、绑定监听端口
         * @param port
         */
        public MultiplexerTimeServer(int port){
            try {
                selector=Selector.open();
                servChannel=ServerSocketChannel.open();
                servChannel.configureBlocking(false);
                servChannel.socket().bind(new InetSocketAddress(port), 1024);
                servChannel.register(selector, SelectionKey.OP_ACCEPT);
                System.out.println("The time server is start in port: "+port);
                
            } catch (IOException e) {
                e.printStackTrace();
                System.exit(1);
            }
        }
    public void stop(){
            this.stop=true;
      }
    @Override
public void run() { while(!stop){ try { //设置selector的休眠时间为1s,不管是否有读写等事件发生,selector每隔1s都被唤醒一次。 selector.select(1000); //当有处于就绪状态的Channel时,selector就返回就绪状态的Channel的SelectionKey集合。 Set<SelectionKey> selectedKeys=selector.selectedKeys(); Iterator<SelectionKey> it=selectedKeys.iterator(); SelectionKey key=null; //经过对就绪状态的Channel集合进行迭代,能够进行网络的异步读写操做。 while(it.hasNext()){ key=it.next(); it.remove(); try { handleInput(key); } catch (Exception e) { if(key!=null){ key.cancel(); if(key.channel()!=null){ key.channel().close(); } } } } } catch (IOException e) { e.printStackTrace(); } }        /* * 多路复用器关闭后,全部注册在上面的Channel和Pipe等资源都会被自动去注册并关闭,因此不须要重复释放资源。 */ if(selector!=null){ try { selector.close(); } catch (IOException e) { e.printStackTrace(); } } }     private void handleInput(SelectionKey key) throws IOException{ if(key.isValid()){ //处理新接入的请求消息 //经过SelectionKey的操做位进行判断便可获知网络事件类型 if(key.isAcceptable()){ //Accept the new connection ServerSocketChannel ssc=(ServerSocketChannel) key.channel(); SocketChannel sc=ssc.accept(); //-----以上操做至关于完成了TCP的三次握手,TCP物理链路正式创建------ //将新建立的SocketChannel设置为异步非阻塞,同时也能够对其TCP参数进行设置,例如TCP接收和发送缓冲区的大小等。 sc.configureBlocking(false); //Add the new connection to the selector sc.register(selector, SelectionKey.OP_READ); }          if(key.isReadable()){ //Read the data SocketChannel sc=(SocketChannel) key.channel(); //因为实现咱们得知客户端发送的码流大小,做为例程,咱们开辟一个1K的缓冲区 ByteBuffer readBuffer=ByteBuffer.allocate(1024); //因为已经设置SocketChannel为异步非阻塞模式,所以它的read是非阻塞的。 int readBytes=sc.read(readBuffer); /* * readBytes>0 读到了字节,对字节进行编解码; * readBytes=0 没有读取到字节,属于正常场景,忽略; * readByte=-1 链路已经关闭,须要关闭SocketChannel,释放资源 */             if(readBytes>0){ //将缓冲区当前的limit设置为position,position设置为0,用于后续对缓冲区的读取操做。 readBuffer.flip(); //根据缓冲区可读的字节个数建立字节数组 byte[] bytes=new byte[readBuffer.remaining()]; //调用ByteBuffer的get操做将缓冲区可读的字节数组复制到新建立爱你的字节数组中 readBuffer.get(bytes); String body=new String(bytes, "UTF-8"); System.out.println("The time server receive order: "+body); //若是请求指令是"QUERY TIME ORDER"则把服务器的当前时间编码后返回给客户端 String currentTime="QUERY TIME ORDER".equalsIgnoreCase(body) ? new Date( System.currentTimeMillis()).toString() : "BAD ORDER"; doWrite(sc,currentTime); }else if(readBytes<0){              //对端链路关闭 key.cancel(); sc.close(); }else{ //读到0字节,忽略 } } } }     private void doWrite(SocketChannel channel,String response) throws IOException{ if(response!=null&& response.trim().length()>0){ byte[] bytes=response.getBytes(); ByteBuffer writeBuffer=ByteBuffer.allocate(bytes.length); //调用ByteBuffer的put操做将字节数组复制到缓冲区 writeBuffer.put(bytes); writeBuffer.flip(); channel.write(writeBuffer); /* * 须要指出的是,因为SocketChannel是异步非阻塞的,它并不保证一次性可以把须要发送的字节数组发送完, * 此时会出现“写半包”问题,咱们须要注册写操做,不断轮询Selector,将没有发送完毕的ByteBuffer发送完毕, * 能够经过ByteBuffer的hasRemaining()方法判断消息是否发送完成。 * 此处仅仅是各简单的入门级例程,没有演示如何处理“写半包”场景,后面会说到。 */ } } }

4.NIO客户端序列图

  NIO客户端建立序列图如图所示。

  步骤一:打开SocketChannel,绑定客户端本地地址(可选,默认系统会随机分配一个可用的本地地址)

SocketChannel clientChannel=SocketChannel.open();

  步骤二:设置SocketChannel为非阻塞模式,同时设置客户端链接的TCP参数。

clientChannel.configureBlocking(false);
socket.setReuseAddress(true);
socket.setReceiveBufferSize(BUFFER_SIZE);
socket.setSendBufferSize(BUFFER_SIZE);

  步骤三:异步链接服务端。

boolean connected=clientChannel.connect(new InetSocketAddress("ip",port));

  步骤四:判断是否链接成功,若是链接成功,则直接注册读状态位到多路复用器中,若是当前没有链接成功(异步链接,返回false,说明客户端已经发送sync包,服务端没有返回ack包,物理链路尚未创建)。

if(connected){
  clientChannel.register(selector,SelectionKey.OP_READ,ioHandler);
}else{
  clientChannel.register(selector,SelectionKey.OP_CONNECT,ioHandler);
}

  步骤五:向Reactor线程的多路复用器注册OP_CONNECT状态位,监听服务端的TCP ACK应答。

clientChannel.register(selector,SelectionKay.OP_CONNECT,ioHandler);

  步骤六:建立Reactor线程,建立多路复用器并启动线程。

Selector selector=Selector.open();
new Thread(new ReactorTask()).start();

  步骤七:多路复用器在线程run方法的无限循环体内轮询准备就绪的key。

int num=selector.select();
Set selectedKeys=selector.selectedKeys();
Iterator it=selectedKeys.iterator();
while(it.hasNext()){
  SelectionKey key=(SelectionKey)it.next();
  //...deal with I/O event...
}

  步骤八:接收connect事件进行处理。

if(key.isConnectable()){
  //handlerConnect();
}

  步骤九:判断链接结果,若是链接成功,注册读事件到多路复用器。

if(channel.finishConnect()){
  registerRead();
}

  步骤十:注册读事件到多路复用器。

clientChannel.register(selector,SelectionKey.OP_READ,ioHandler);

  步骤十一:异步读客户端请求消息到缓冲区。

int readNumber=channel.read(receivedBuffer);

  步骤十二:对ByteBuffer进行编解码,若是有半包消息接收缓冲区Reset,继续读取后续的报文,将解码成功的消息封装成Task,投递到业务线程池中,进行业务逻辑编排。

Object message=null;

while(buffer.hasRemain()){
  byteBuffer.mark();
  Object message=decode(byteBuffer);
  if(message==null){
    byteBuffer.reset();
    break;
  }
  messageList.add(message);
}

if(!byteBuffer.hasRemain()){
  byteBuffer.clear();
}else{
  byteBuffer.compact();
}

if(messageList!=null & !messageList.isEmpty()){
  for(Object messageE:messageList){
    handlerTask(messageE);
  }
}

  步骤十三:将POJO对象encode成ByteBuffer,调用SocketChannel的异步write接口,将消息异步发送给客户端。

socketChannel.wirte(buffer);

5.NIO建立的TimeClient源码分析

package joanna.yan.nio;

public class TimeClient {
    public static void main(String[] args) {
        int port=9090;
        if(args!=null&&args.length>0){
            try {
                port=Integer.valueOf(args[0]);
            } catch (Exception e) {
                // 采用默认值
            }
        }
        
        new Thread(new TimeClientHandle("127.0.0.1", port),"TimClient-001").start();
    }
}
package joanna.yan.nio;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;
/**
 * 处理异步链接和读写操做
 * @author Joanna.Yan
 * @date 2017年11月6日下午4:33:14
 */
public class TimeClientHandle implements Runnable{
    private String host;
    private int port;
    private Selector selector;
    private SocketChannel socketChannel;
    private volatile boolean stop;
    
    /**
     * 初始化NIO的多路复用器和SocketChannel对象
     * @param host
     * @param port
     */
        public TimeClientHandle(String host,int port){
        this.host=host==null ? "127.0.0.1" : host;
        this.port=port;
        try {
            selector=Selector.open();
            socketChannel=SocketChannel.open();
            //设置为异步非阻塞模式,同时还能够设置SocketChannel的TCP参数。例如接收和发送的TCP缓冲区大小
            socketChannel.configureBlocking(false);
        } catch (IOException e) {
            e.printStackTrace();
            System.exit(1);
        }
    }
@Override
public void run() { try { doConnect(); } catch (IOException e) { e.printStackTrace(); System.exit(1); } while(!stop){ try { selector.select(1000); Set<SelectionKey> selectedKeys=selector.selectedKeys(); Iterator<SelectionKey> it=selectedKeys.iterator(); SelectionKey key=null; while(it.hasNext()){//轮询多路复用器Selector,当有就绪的Channel时 key=it.next(); it.remove(); try { handleInput(key); } catch (Exception e) { if(key!=null){ key.cancel(); if(key.channel()!=null){ key.channel().close(); } } } } } catch (IOException e) { e.printStackTrace(); System.exit(1); } } //多路复用器关闭后,全部注册在上面的Channel和Pipe等资源都会被自动注册并关闭,因此不须要重复释放资源。 /* * 因为多路复用器上可能注册成千上万的Channel或者pipe,若是一一对这些资源进行释放显然不合适。 * 所以,JDK底层会自动释放全部跟此多路复用器关联的资源。 */ if(selector!=null){ try { selector.close(); } catch (IOException e) { e.printStackTrace(); } } } //多路复用器关闭后,全部注册在上面的Channel和Pipe等资源都会被自动注册并关闭,因此不须要重复释放资源。 /* * 因为多路复用器上可能注册成千上万的Channel或者pipe,若是一一对这些资源进行释放显然不合适。 * 所以,JDK底层会自动释放全部跟此多路复用器关联的资源。 */ if(selector!=null){ try { selector.close(); } catch (IOException e) { e.printStackTrace(); } } } private void handleInput(SelectionKey key) throws ClosedChannelException, IOException { if(key.isValid()){ //判断是否链接成功 SocketChannel sc=(SocketChannel) key.channel(); if(key.isConnectable()){//处于链接状态,说明服务器已经返回ACK应答消息 if(sc.finishConnect()){//对链接结果进行判断 /* * 将SocketChannel注册到多路复用器上,注册SelectionKey.OP_READ操做位, * 监听网络读操做,而后发送请求消息给服务端。 */ sc.register(selector, SelectionKey.OP_READ); doWrite(sc); }else{ System.exit(1);//链接失败,进程退出 } } if(key.isReadable()){ //开辟缓冲区 ByteBuffer readBuffer=ByteBuffer.allocate(1024); //异步读取 int readBytes=sc.read(readBuffer); if(readBytes>0){ readBuffer.flip(); byte[] bytes=new byte[readBuffer.remaining()]; readBuffer.get(bytes); String body=new String(bytes, "UTF-8"); System.out.println("Now is: "+body); this.stop=true; }else if(readBytes<0){ //对端链路关闭 key.cancel(); sc.close(); }else{ //读到0字节,忽略 } } } } private void doConnect() throws IOException { //若是直接链接成功,则将SocketChannel注册到多路复用器Selector上,发送请求消息,读应答 if(socketChannel.connect(new InetSocketAddress(host, port))){ socketChannel.register(selector, SelectionKey.OP_READ); doWrite(socketChannel); }else{ /* * 若是没有直接链接成功,则说明服务端没有返回TCP握手应答信息,但这并不表明链接失败, * 咱们须要将SocketChannel注册到多路复用器Selector上,注册SelectionKey.OP_CONNECT, * 当服务端返回TCP syn-ack消息后,Selector就能轮询到整个SocketChannel处于链接就绪状态。 */ socketChannel.register(selector, SelectionKey.OP_CONNECT); } } private void doWrite(SocketChannel sc) throws IOException { byte[] req="QUERY TIME ORDER".getBytes(); ByteBuffer writeBuffer=ByteBuffer.allocate(req.length); //写入到发送缓冲区中 writeBuffer.put(req); writeBuffer.flip(); //因为发送是异步的,因此会存在"半包写"问题,此处不赘述 sc.write(writeBuffer); if(!writeBuffer.hasRemaining()){//若是缓冲区中的消息所有发送完成 System.out.println("Send order 2 server succeed."); } } }

  经过源码对比分析发现,NIO编程难度确实比同步阻塞BIO大不少,此处咱们的NIO例程并无考虑“半包读”和“半包写”,若是加上这些,代码会更加复杂。NIO代码既然这么复杂,为何它的应用却愈来愈普遍呢,使用NIO编程的优势总结以下:

  1. 客户端发起的链接操做是异步的,能够经过多路复用器注册OP_CONNECT等待后续结果,不须要像以前的客户端那样被同步阻塞。
  2. SocketChannel的读写操做都是异步的,若是没有可读写的数据它不会同步等待,直接返回,这样I/O通讯线程就能够处理其余的链路,不须要同步等待这个链路可用。
  3. 线程模型的优化:因为JDK的Selector在Linux等主流操做系统上经过epoll实现,它没有链接句柄数的限制(只受限于操做系统的最大句柄数或者对单个进程的句柄限制),这意味着一个Selector线程能够同时处理成千上万个客户端链接,并且性能不会随着客户端的增长而线性降低,所以,它很是适合作高性能、高负载的网络服务器。

  JDK1.7升级了NIO类库,升级后的NIO类库被称为NIO 2.0。引入注目的是,Java正式提供了异步文件I/O操做,同时提供了与UNIX网络编程事件驱动I/O对应的AIO。

Java IO编程全解(五)——AIO编程

若是此文对您有帮助,微信打赏我一下吧~

相关文章
相关标签/搜索