咱们熟知的Socket编程就是BIO,每一个请求对应一个线程去处理。一个socket链接一个处理线程(这个线程负责这个Socket链接的一系列数据传输操做)。阻塞的缘由在于:操做系统容许的线程数量是有限的,多个socket申请与服务端创建链接时,服务端不能提供相应数量的处理线程,没有分配处处理线程的链接就会阻塞等待或被拒绝。java
以下图就是BIO(1:1同步阻塞)通讯模型,每当有一个请求过来,都会建立新的线程,当线程数达到必定数量,占满了整台机器的资源,那么机器就挂掉了。对于CPU来讲也是一个很差的事情,由于会致使频繁的切换上下文。react
那么咱们有以下的改进措施(M:N同步阻塞IO),可是仍是有上面的一些问题,仅仅是解决了频繁的建立线程的问题,不过因为是同步,若是读写速度慢,那么每一个线程进来是会致使阻塞的,性能的高低彻底取决于阻塞的时间。这个对于用户的体验也是至关很差的。linux
NIO 是一种同步非阻塞的 IO 模型。同步是指线程不断轮询 IO 事件是否就绪,非阻塞是指线程在等待 IO 的时候,能够同时作其余任务。同步的核心就是 Selector,Selector 代替了线程自己轮询 IO 事件,避免了阻塞同时减小了没必要要的线程消耗;非阻塞的核心就是通道和缓冲区,当 IO 事件就绪时,能够经过写道缓冲区,保证 IO 的成功,而无需线程阻塞式地等待。编程
非阻塞式IO模型(NIO)NIO+单线程Reactor模式:reactor设计模式是event-driven architecture的一种实现方式,处理多个客户端并发的向服务端请求服务的场景。每种服务在服务端可能由多个方法组成。reactor会解耦并发请求的服务并分发给对应的事件处理器来处理。设计模式
reactor主要由如下几个角色构成:handle、Synchronous Event Demultiplexer、Initiation Dispatcher、Event Handler、Concrete Event Handlerapi
处理流程:数组
这种模型状况下,因为 acceptor 是单线程的,既要接受请求,还要去处理时间,若是某一些事件处理请求花费的时间比较长,那么这个请求将会进入等待,整个状况下会同步。基于这种问题下咱们有什么改进措施呢?ruby
非阻塞式IO模型(NIO)NIO+多线程Reactor模式:可使用多线程去处理,使用线程池,让acceptor仅仅去接受请求,把事件的处理交给线程池中的线程去处理:服务器
那么在这种状况下还存在哪些弊端呢?将处理器的执行放入线程池,多线程进行业务处理。但Reactor仍为单个线程。仍是acceptor是单线程的,没法去并行的去响应多个客户端,那么要怎么处理呢?网络
NIO+主从多线程Reactor模式:
mainReactor负责监听链接,accept链接给subReactor处理,为何要单独分一个Reactor来处理监听呢?由于像TCP这样须要通过3次握手才能创建链接,这个创建链接的过程也是要耗时间和资源的,单独分一个Reactor来处理,能够提升性能。
异步阻塞IO(AIO):
NIO是同步的IO,是由于程序须要IO操做时,必须得到了IO权限后亲自进行IO操做才能进行下一步操做。AIO是对NIO的改进(因此AIO又叫NIO.2),它是基于Proactor模型的。每一个socket链接在事件分离器注册 IO完成事件 和 IO完成事件处理器。程序须要进行IO时,向分离器发出IO请求并把所用的Buffer区域告知分离器,分离器通知操做系统进行IO操做,操做系统本身不断尝试获取IO权限并进行IO操做(数据保存在Buffer区),操做完成后通知分离器;分离器检测到 IO完成事件,则激活 IO完成事件处理器,处理器会通知程序说“IO已完成”,程序知道后就直接从Buffer区进行数据的读写。
也就是说:AIO是发出IO请求后,由操做系统本身去获取IO权限并进行IO操做;NIO则是发出IO请求后,由线程不断尝试获取IO权限,获取到后通知应用程序本身进行IO操做。
同步/异步:数据若是还没有就绪,是否须要等待数据结果。
阻塞/非阻塞:进程/线程须要操做的数据若是还没有就绪,是否妨碍了当前进程/线程的后续操做。应用程序的调用是否当即返回!
NIO与BIO最大的区别是 BIO是面向流的,而NIO是面向Buffer的。
NIO还提供了两个新概念:Buffer和Channel:
Buffer: 是一块连续的内存块,是 NIO 数据读或写的中转地。 为何说NIO是基于缓冲区的IO方式呢?由于,当一个连接创建完成后,IO的数据未必会立刻到达,为了当数据到达时可以正确完成IO操做,在BIO(阻塞IO)中,等待IO的线程必须被阻塞,以全天候地执行IO操做。为了解决这种IO方式低效的问题,引入了缓冲区的概念,当数据到达时,能够预先被写入缓冲区,再由缓冲区交给线程,所以线程无需阻塞地等待IO。
Channel: 数据的源头或者数据的目的地 ,用于向 buffer 提供数据或者读取 buffer 数据 ,buffer 对象的惟一接口,异步 I/O 支持。
Buffer做为IO流中数据的缓冲区,而Channel则做为socket的IO流与Buffer的传输通道。客户端socket与服务端socket之间的IO传输不直接把数据交给CPU使用,而是先通过Channel通道把数据保存到Buffer,而后CPU直接从Buffer区读写数据,一次能够读写更多的内容。使用Buffer提升IO效率的缘由(这里与IO流里面的BufferedXXStream、BufferedReader、BufferedWriter提升性能的原理同样):IO的耗时主要花在数据传输的路上,普通的IO是一个字节一个字节地传输,而采用了Buffer的话,经过Buffer封装的方法(好比一次读一行,则以行为单位传输而不是一个字节一次进行传输)就能够实现“一大块字节”的传输。好比:IO就是送快递,普通IO是一个快递跑一趟,采用了Buffer的IO就是一车跑一趟。很明显,buffer效率更高,花在传输路上的时间大大缩短。
面向buffer的通道,一个Channel(通道)表明和某一实体的链接,这个实体能够是文件、网络套接字等。也就是说,通道是Java NIO提供的一座桥梁,用于咱们的程序和操做系统底层I/O服务进行交互。通道是一种很基本很抽象的描述,和不一样的I/O服务交互,执行不一样的I/O操做,实现不同,所以具体的有FileChannel、SocketChannel,ServerSocketChannel,DatagramChannel等。通道使用起来跟Stream比较像,能够读取数据到Buffer中,也能够把Buffer中的数据写入通道。可是channel是双向的,而stream是单向的。
在Java NIO中,若是两个通道中有一个是FileChannel,那你能够直接将数据从一个channel传输到另一个channel。对应的api是 transferFrom() 跟transferTo()。
buffer:
与Java基本类型相对应,NIO提供了多种 Buffer 类型,如ByteBuffer、CharBuffer、IntBuffer等,区别就是读写缓冲区时的单位长度不同(以对应类型的变量为单位进行读写)。Buffer中有3个很重要的变量,它们是理解Buffer工做机制的关键,分别是capacity (总容量),position (指针当前位置),limit (读/写边界位置)。
Buffer的工做方式跟C语言里的字符数组很是的像,类比一下,capacity就是数组的总长度,position就是咱们读/写字符的下标变量,limit就是结束符的位置。Buffer初始时3个变量的状况以下图:
在对Buffer进行读/写的过程当中,position会日后移动,而 limit 就是 position 移动的边界。由此不难想象,在对Buffer进行写入操做时,limit应当设置为capacity的大小,而对Buffer进行读取操做时,limit应当设置为数据的实际结束位置。(注意:将Buffer数据 写入 通道是Buffer 读取 操做,从通道 读取 数据到Buffer是Buffer 写入 操做)
在对Buffer进行读/写操做前,咱们能够调用Buffer类提供的一些辅助方法来正确设置 position 和 limit 的值,主要有以下几个:
java 层面中Buffer是一个顶层抽象类,咱们须要先了解一下经常使用的实现进行数据的编/解码:
public class BufferDemo { public static void decode(String str) throws UnsupportedEncodingException { // 开辟一个长度为128的字节空间 ByteBuffer byteBuffer = ByteBuffer.allocate(128); //写入数据 byteBuffer.put(str.getBytes("UTF-8")); //写完数据之后要进行读取,须要设置 limit 为 position 的值,而后 position 置为0。 byteBuffer.flip(); /*获取utf8的编解码器*/ Charset utf8 = Charset.forName("UTF-8"); CharBuffer charBuffer = utf8.decode(byteBuffer);/*对bytebuffer中的内容解码*/ /*array()返回的就是内部的数组引用,编码之后的有效长度是0~limit*/ char[] charArr = Arrays.copyOf(charBuffer.array(), charBuffer.limit()); System.out.println(charArr); } public static void encode(String str){ CharBuffer charBuffer = CharBuffer.allocate(128); charBuffer.append(str); charBuffer.flip(); /*对获取utf8的编解码器*/ Charset utf8 = Charset.forName("UTF-8"); ByteBuffer byteBuffer = utf8.encode(charBuffer); /*对charbuffer中的内容解码*/ /*array()返回的就是内部的数组引用,编码之后的有效长度是0~limit*/ byte[] bytes = Arrays.copyOf(byteBuffer.array(), byteBuffer.limit()); System.out.println(Arrays.toString(bytes)); } public static void main(String[] args) throws UnsupportedEncodingException { BufferDemo.decode("解码测试"); BufferDemo.encode("编码测试"); } }
再来看看 FileChannel 的简单应用:
public class FileChannelDemo { public static void main(String[] args) throws Exception { /*-------从buffer往fileChannel中写入数据-------------------------*/ File file =new File("D:/nio.data"); if(!file.exists()) {//判断文件是否存在,不存在则建立 file.createNewFile(); } //获取输出流 FileOutputStream outputStream = new FileOutputStream(file); //从输出流中获取channel FileChannel writeFileChannel = outputStream.getChannel(); //开辟新的字节空间 ByteBuffer byteBuffer = ByteBuffer.allocate(128); //写入数据 byteBuffer.put("fileChannel hello".getBytes("UTF-8")); //刷新指针 byteBuffer.flip(); //进行写操做 writeFileChannel.write(byteBuffer); byteBuffer.clear(); outputStream.close(); writeFileChannel.close(); /*-------从fileChannel往buffer中写入数据-------------------------*/ Path path = Paths.get("D:/nio.data"); FileChannel readFileChannel = FileChannel.open(path); ByteBuffer byteBuffer2 = ByteBuffer.allocate((int)readFileChannel.size()+1); Charset charset = Charset.forName("UTF-8"); readFileChannel.read(byteBuffer2); byteBuffer2.flip(); CharBuffer charBuffer = charset.decode(byteBuffer2); System.out.println(charBuffer.toString()); byteBuffer2.clear(); readFileChannel.close(); } }
selector:
Selector(选择器)是一个特殊的组件,用于采集各个通道的状态(或者说事件)。咱们先将通道注册到选择器,并设置好关心的事件,而后就能够经过调用select()方法,静静地等待事件发生。通道有以下4个事件可供咱们监听:
因为若是用阻塞I/O,须要多线程(浪费内存),若是用非阻塞I/O,须要不断重试(耗费CPU)。Selector的出现解决了这尴尬的问题,非阻塞模式下,经过Selector,咱们的线程只为已就绪的通道工做,不用盲目的重试了。好比,当全部通道都没有数据到达时,也就没有Read事件发生,咱们的线程会在select()方法处被挂起,从而让出了CPU资源。
结合上面的三大组件,来实现一下基本的NIO流程,服务端:
/*服务器端,:接收客户端发送过来的数据并显示, *服务器把上接收到的数据加上"echo from service:"再发送回去*/ public class ServiceSocketChannelDemo { public static class TCPEchoServer implements Runnable{ /*服务器地址*/ private InetSocketAddress localAddress; public TCPEchoServer(int port) throws IOException { this.localAddress = new InetSocketAddress(port); } @Override public void run(){ Charset utf8 = Charset.forName("UTF-8"); ServerSocketChannel ssc = null; Selector selector = null; Random rnd = new Random(); try { /*建立选择器*/ selector = Selector.open(); /*建立服务器通道*/ ssc = ServerSocketChannel.open(); ssc.configureBlocking(false); /*设置监听服务器的端口,设置最大链接缓冲数为100*/ ssc.bind(localAddress, 100); /*服务器通道只能对tcp连接事件感兴趣*/ ssc.register(selector, SelectionKey.OP_ACCEPT); } catch (IOException e1) { System.out.println("server start failed"); return; } System.out.println("server start with address : " + localAddress); /*服务器线程被中断后会退出*/ try{ while(!Thread.currentThread().isInterrupted()){ int n = selector.select(); if(n == 0){ continue; } Set<SelectionKey> keySet = selector.selectedKeys(); Iterator<SelectionKey> it = keySet.iterator(); SelectionKey key = null; while(it.hasNext()){ key = it.next(); /*防止下次select方法返回已处理过的通道*/ it.remove(); /*若发现异常,说明客户端链接出现问题,但服务器要保持正常*/ try{ /*ssc通道只能对连接事件感兴趣*/ if(key.isAcceptable()){ /*accept方法会返回一个普统统道, 每一个通道在内核中都对应一个socket缓冲区*/ SocketChannel sc = ssc.accept(); sc.configureBlocking(false); /*向选择器注册这个通道和普统统道感兴趣的事件,同时提供这个新通道相关的缓冲区*/ int interestSet = SelectionKey.OP_READ; sc.register(selector, interestSet, new Buffers(256, 256)); System.out.println("accept from " + sc.getRemoteAddress()); } /*(普通)通道感兴趣读事件且有数据可读*/ if(key.isReadable()){ /*经过SelectionKey获取通道对应的缓冲区*/ Buffers buffers = (Buffers)key.attachment(); ByteBuffer readBuffer = buffers.getReadBuffer(); ByteBuffer writeBuffer = buffers.gerWriteBuffer(); /*经过SelectionKey获取对应的通道*/ SocketChannel sc = (SocketChannel) key.channel(); /*从底层socket读缓冲区中读入数据*/ sc.read(readBuffer); readBuffer.flip(); /*解码显示,客户端发送来的信息*/ CharBuffer cb = utf8.decode(readBuffer); System.out.println(cb.array()); readBuffer.rewind(); /*准备好向客户端发送的信息*/ /*先写入"echo:",再写入收到的信息*/ writeBuffer.put("echo from service:".getBytes("UTF-8")); writeBuffer.put(readBuffer); readBuffer.clear(); /*设置通道写事件*/ key.interestOps(key.interestOps() | SelectionKey.OP_WRITE); } /*通道感兴趣写事件且底层缓冲区有空闲*/ if(key.isWritable()){ Buffers buffers = (Buffers)key.attachment(); ByteBuffer writeBuffer = buffers.gerWriteBuffer(); writeBuffer.flip(); SocketChannel sc = (SocketChannel) key.channel(); int len = 0; while(writeBuffer.hasRemaining()){ len = sc.write(writeBuffer); /*说明底层的socket写缓冲已满*/ if(len == 0){ break; } } writeBuffer.compact(); /*说明数据所有写入到底层的socket写缓冲区*/ if(len != 0){ /*取消通道的写事件*/ key.interestOps(key.interestOps() & (~SelectionKey.OP_WRITE)); } } }catch(IOException e){ System.out.println("service encounter client error"); /*若客户端链接出现异常,从Seletcor中移除这个key*/ key.cancel(); key.channel().close(); } } Thread.sleep(rnd.nextInt(500)); } }catch(InterruptedException e){ System.out.println("serverThread is interrupted"); } catch (IOException e1) { System.out.println("serverThread selecotr error"); }finally{ try{ selector.close(); }catch(IOException e){ System.out.println("selector close failed"); }finally{ System.out.println("server close"); } } } } public static void main(String[] args) throws InterruptedException, IOException{ Thread thread = new Thread(new TCPEchoServer(8080)); thread.start(); Thread.sleep(100000); /*结束服务器线程*/ thread.interrupt(); } }
Buffers:
/*自定义Buffer类中包含读缓冲区和写缓冲区,用于注册通道时的附加对象*/ public class Buffers { ByteBuffer readBuffer; ByteBuffer writeBuffer; public Buffers(int readCapacity, int writeCapacity){ readBuffer = ByteBuffer.allocate(readCapacity); writeBuffer = ByteBuffer.allocate(writeCapacity); } public ByteBuffer getReadBuffer(){ return readBuffer; } public ByteBuffer gerWriteBuffer(){ return writeBuffer; } }
客户端:
/*客户端:客户端每隔1~2秒自动向服务器发送数据,接收服务器接收到数据并显示*/ public class ClientSocketChannelDemo { public static class TCPEchoClient implements Runnable{ /*客户端线程名*/ private String name; private Random rnd = new Random(); /*服务器的ip地址+端口port*/ private InetSocketAddress remoteAddress; public TCPEchoClient(String name, InetSocketAddress remoteAddress){ this.name = name; this.remoteAddress = remoteAddress; } @Override public void run(){ /*建立解码器*/ Charset utf8 = Charset.forName("UTF-8"); Selector selector; try { /*建立TCP通道*/ SocketChannel sc = SocketChannel.open(); /*设置通道为非阻塞*/ sc.configureBlocking(false); /*建立选择器*/ selector = Selector.open(); /*注册感兴趣事件*/ int interestSet = SelectionKey.OP_READ | SelectionKey.OP_WRITE; /*向选择器注册通道*/ sc.register(selector, interestSet, new Buffers(256, 256)); /*向服务器发起链接,一个通道表明一条tcp连接*/ sc.connect(remoteAddress); /*等待三次握手完成*/ while(!sc.finishConnect()){ ; } System.out.println(name + " " + "finished connection"); } catch (IOException e) { System.out.println("client connect failed"); return; } /*与服务器断开或线程被中断则结束线程*/ try{ int i = 1; while(!Thread.currentThread().isInterrupted()){ /*阻塞等待*/ selector.select(); /*Set中的每一个key表明一个通道*/ Set<SelectionKey> keySet = selector.selectedKeys(); Iterator<SelectionKey> it = keySet.iterator(); /*遍历每一个已就绪的通道,处理这个通道已就绪的事件*/ while(it.hasNext()){ SelectionKey key = it.next(); /*防止下次select方法返回已处理过的通道*/ it.remove(); /*经过SelectionKey获取对应的通道*/ Buffers buffers = (Buffers)key.attachment(); ByteBuffer readBuffer = buffers.getReadBuffer(); ByteBuffer writeBuffer = buffers.gerWriteBuffer(); /*经过SelectionKey获取通道对应的缓冲区*/ SocketChannel sc = (SocketChannel) key.channel(); /*表示底层socket的读缓冲区有数据可读*/ if(key.isReadable()){ /*从socket的读缓冲区读取到程序定义的缓冲区中*/ sc.read(readBuffer); readBuffer.flip(); /*字节到utf8解码*/ CharBuffer cb = utf8.decode(readBuffer); /*显示接收到由服务器发送的信息*/ System.out.println(cb.array()); readBuffer.clear(); } /*socket的写缓冲区可写*/ if(key.isWritable()){ writeBuffer.put((name + " " + i).getBytes("UTF-8")); writeBuffer.flip(); /*将程序定义的缓冲区中的内容写入到socket的写缓冲区中*/ sc.write(writeBuffer); writeBuffer.clear(); i++; } } Thread.sleep(1000 + rnd.nextInt(1000)); } }catch(InterruptedException e){ System.out.println(name + " is interrupted"); }catch(IOException e){ System.out.println(name + " encounter a connect error"); }finally{ try { selector.close(); } catch (IOException e1) { System.out.println(name + " close selector failed"); }finally{ System.out.println(name + " closed"); } } } } public static void main(String[] args) throws InterruptedException{ InetSocketAddress remoteAddress = new InetSocketAddress("127.0.0.1", 8080); Thread ta = new Thread(new TCPEchoClient("thread a", remoteAddress)); ta.start(); Thread.sleep(5000); /*结束客户端a*/ ta.interrupt(); } }