BIO是基于字节流和字符流进行操做的,而NIO是基于通道Channel和缓冲区Buffer进行操做的,数据从通道读取到缓冲区中,或者从缓冲区写入到通道中。java
NIO的类库位于java.lang.nio下,其中有以下一些基本的概念:编程
通道Channel:通道的做用与BIO中的流相似,主要不一样的地方在于:缓存
缓冲区Buffer:在BIO中,能够直接将数据写入或者直接读取到流中,也能够经过装饰类添加缓冲的功能;而在NIO中,全部的数据都是用缓冲区处理的,任什么时候候使用NIO读取或者写入数据都是经过缓冲区进行的。缓冲区本质上是一块能够读写数据的内存,这块内存被包装成NIO Buffer对象,并提供了一组方法来访问该块内存。服务器
分散/汇集scatter/gather:分散和汇集是用来描述从通道中读取或者写入通道的操做。分散从通道中读取是指读操做时将读取的数据写入多个缓冲区中;汇集写入通道是指写入操做时将多个缓冲区的数据写入到同一个通道中。分散/汇集一般用于须要将传输数据分开处理的场合,如传输一个消息能够将消息头和消息体分散到不一样的buffer中。网络
选择器Selector:selector模型是NIO编程的基础,多路复用器Selector经过不断地轮询已经注册过的通道,检测出就绪的通道集合,从而能够实现一个线程管理多个通道,管理多个网络链接。并发
java.nio包中经常使用的Channel实现类有:dom
FileChannel没法设置为非阻塞模式,只能运行在阻塞模式下。使用FileChannel的几个基本步骤包括:打开FileChannel、从FileChannel读写数据、关闭FileChannel。异步
private static final int BUF_SIZE = 1024; public static void main(String[] args) { // 打开FileChannel须要经过FileIntputStream或FileOutputStream或RandomAccessFile try (FileOutputStream out = new FileOutputStream(new File("d:\\test.txt")); FileChannel channel = out.getChannel();) { ByteBuffer buffer = ByteBuffer.allocate(BUF_SIZE); buffer.put("NIO学习:NIO FileChannel Demo".getBytes());// 先往buffer中写入数据 buffer.flip();// 调转buffer中读写指针position的位置 printBuffer(buffer); //打印buffer中内容 channel.write(buffer); // 将buffer中数据写入channel } catch (FileNotFoundException e) { e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); } } private static void printBuffer(ByteBuffer buffer) { try { Charset charset = Charset.forName("UTF-8"); CharsetDecoder decoder = charset.newDecoder(); CharBuffer cBuf = decoder.decode(buffer); buffer.flip(); System.out.println(cBuf.toString()); } catch (CharacterCodingException e) { e.printStackTrace(); } }
private static final int BUF_SIZE = 1024; public static void main(String[] args) { //channel关联文件——>经过channel读取数据到buffer try (FileInputStream in = new FileInputStream(new File("d:\\test.txt")); FileChannel channel = in.getChannel();) { ByteBuffer buffer = ByteBuffer.allocate(BUF_SIZE); channel.read(buffer); buffer.flip(); printBuffer(buffer); } catch (FileNotFoundException e) { e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); } } private static void printBuffer(ByteBuffer buffer) { Charset charset = Charset.forName("UTF-8"); CharsetDecoder decoder = charset.newDecoder(); try { CharBuffer buf = decoder.decode(buffer); buffer.flip(); System.out.println(buf.toString()); } catch (CharacterCodingException e) { e.printStackTrace(); } }
close():使用完FileChannel须要关闭channel,为简洁代码可使用try-with-resources语法。socket
position():position()方法能够获取FileChannel当前的位置,而position(long pos)能够设置FileChannel当前位置,可是若是将位置设置在文件结束符以后,调用position()将返回-1;调用position(pos)写入数据,则会把文件撑大到当前位置并写入数据,这样会致使磁盘上物理文件中写入的数据间有空隙。性能
long pos = channle.position(); channel.position(100L);
size():channel.size()方法返回的是channel关联文件的大小。
truncate(long size):truncate()方法用来截取文件,此时文件中指定长度后面的部分将会被删除,如:
channel.truncate(100);// 将会保留前100个字节
DatagramChannel是用来收发UDP包的通道,由于UDP是无链接的网络协议,因此DatagramChannel收发的是UDP数据包。
public static void main(String[] args) throws InterruptedException { new Thread(() -> { // 打开DatagramChannel,监听UDP9999端口 try (DatagramChannel channel = DatagramChannel.open()) { channel.socket().bind(new InetSocketAddress(9999)); ByteBuffer buffer = ByteBuffer.allocate(100); // 经过channel的recevice方法接收UDP数据包 channel.receive(buffer); buffer.flip(); printBuffer(buffer); } catch (IOException e) { e.printStackTrace(); } }).start(); Thread.sleep(1000); // 服务端先启动 new Thread(() -> { try (DatagramChannel channel = DatagramChannel.open();) { ByteBuffer buf = ByteBuffer.allocate(100); buf.clear(); buf.put("DatagramChannel Demo".getBytes()); // 发送数据前注意要把buffer的position置为0 buf.flip(); // 调用send方法发送到指定IP地址的指定端口 channel.send(buf, new InetSocketAddress("localhost", 9999)); } catch (IOException e) { e.printStackTrace(); } }).start(); }
因为UDP是无链接的,当指定链接的IP地址或域名时并不会建立一个真正的链接,而是锁住了DatagramChannel,只能从锁定的地址收发数据,而且数据传送没有可靠性保证。
public static void main(String[] args) throws InterruptedException { Thread server = new Thread(() -> { try (ServerSocketChannel channel = ServerSocketChannel.open()) { channel.socket().bind(new InetSocketAddress(44593)); SocketChannel socket = channel.accept(); ByteBuffer buffer = ByteBuffer.allocate(1024); socket.read(buffer); buffer.flip(); printBuffer(buffer); } catch (IOException e) { e.printStackTrace(); } }); Thread client = new Thread(() -> { try (SocketChannel channel = SocketChannel.open()) { channel.socket().connect(new InetSocketAddress("127.0.0.1", 44593)); ByteBuffer buffer = ByteBuffer.allocate(1034); buffer.put("socket channle demo".getBytes()); buffer.flip(); channel.write(buffer); } catch (IOException e) { e.printStackTrace(); } }); //启动顺序不影响结果 server.start(); client.start(); }
Buffer做为数据的读写缓冲区,具有读和写两种模式。
public abstract class Buffer { // Invariants: mark <= position <= limit <= capacity private int mark = -1; private int position = 0; private int limit; private int capacity; ...... public final Buffer flip() { limit = position; position = 0; mark = -1; return this; } public final Buffer clear() { position = 0; limit = capacity; mark = -1; return this; } ...... }
capacity:buffer的容量,在申请buffer时指定大小,是固定不变的。
limit:buffer可使用的上限——写模式下表示最多能往buffer中写入数据的边界,初始化时limit等于capacity;调用flip()切换为读模式后limit会等于当前的position;表示能读到的数据边界。当一次读写操做完成后,limit的值可能不会等于capacity,存在内存泄露的状况(这个不知道算不算设计不够友好),避免这种状况要在每一次读写操做完成后执行clear()方法清空buffer。
position:position能够当作是一个读写指针,指示当前读或写的位置,随put/get方法自动更新,当buffer中的数据准备好了,须要从写模式切换为读模式时,须要调用buffer.flip()方法,能够看到flip()方法会将当前写的最后一个位置赋值给limit,而后将position切换为0,即变成从0位置开始读,能够读到limit位置,反之从读模式切换为写模式也是如此。
mark: mark用来标记某个时刻的一个position,经过调用buffer.mark()方法能够记录当前的position,以后能经过buffer.reset()恢复到这个position。mark默认值是-1,而且其值必须小于position,若是调用buffer.position(index)时传入的index比mark小,则会将mark设置为-1使暂存的位置失效。
这4个属性的大小关系是$mark <= position <= limit <= capacity $
public static void main(String[] args) { ByteBuffer buffer = ByteBuffer.allocate(10); buffer.put("abcde".getBytes()); buffer.flip(); while (buffer.hasRemaining()) { System.out.print(buffer.get() + " "); } buffer.flip(); try { buffer.put("abcdef".getBytes()); //没有clear()将抛出BufferOverflowException } catch (Exception e) { e.printStackTrace(); } }
一个selector能够注册多个channel,而且seletor要求channel必须工做在非阻塞模式下,所以FileChannel不能结合selector使用,同时注册的channel须要调用 channel.configureBlocking(flase);
设置为非阻塞通道。
channel声明了一个抽象方法用来注册channel到selector:
public abstract SelectionKey register(Selector sel, int ops, Object att) throws ClosedChannelException;
SelectionKey:SelectionKey包含了许多有用的属性,如interest集合、ready集合、channel对象、selector对象等;经过SelectionKey返回值,能够进行各类操做。
ops:ops是一个int类型数,其实质表示的是事件类型的集合,即channel注册selector时告诉selector其对哪些事件感兴趣。SelectionKey中只定义了四种事件类型,分别用四个常量表示:
而且channel也不是四种事件都能注册,不一样的channel只能注册validOps()方法中有定义的事件。
public final int validOps() { return (SelectionKey.OP_READ | SelectionKey.OP_WRITE | SelectionKey.OP_CONNECT); }
public final int validOps() { return (SelectionKey.OP_READ | SelectionKey.OP_WRITE); }
public final int validOps() { return SelectionKey.OP_ACCEPT; }
注册了selector的channel便能将本来由本身调用accept的工做交由selector来代替。selector经过select()方法根据channel注册时所关联的感兴趣的事件返回准备就绪的channel。此时,本来阻塞在channel.accept()上的操做变成了阻塞在selector.select()上。
当select()返回值大于0时,说明有channel准备就绪了,进一步处理能够按如下步骤进行:
Set keys = selector.selectedKeys();
;selectNow()和select()不一样之处在于前者不会阻塞当前线程,而是直接返回。
wakeUp()是用来唤醒被select()阻塞的线程的,有的时候select()阻塞的线程,咱们不想其一直被阻塞,而是一段时间内若是没有通道就绪就继续执行,那么这个时候能够在另一个线程里调用selector.wakeUp(),可是这里有个“坑”就是若是当前的selector没有被阻塞在select上,那么下一次调用该selector对象的select方法会被当即唤醒。
public class Server { public static void main(String[] args) { try { Selector selector = Selector.open(); ServerSocketChannel channel = ServerSocketChannel.open(); channel.configureBlocking(false); channel.socket().bind(new InetSocketAddress(44593)); channel.register(selector, channel.validOps()); while (true) { while (selector.select() == 0) { continue; } Set<SelectionKey> keys = selector.selectedKeys(); Iterator<SelectionKey> iterator = keys.iterator(); while (iterator.hasNext()) { SelectionKey key = iterator.next(); if (key.isAcceptable()) { ServerSocketChannel server = (ServerSocketChannel) key.channel(); SocketChannel client = server.accept(); client.configureBlocking(false); client.register(selector, SelectionKey.OP_READ); } if(key.isReadable()) { SocketChannel client = (SocketChannel) key.channel(); ByteBuffer buffer = ByteBuffer.allocate(100); client.read(buffer); buffer.flip(); BufferUtil.printBuffer(buffer); client = (SocketChannel) key.channel(); client.register(selector, SelectionKey.OP_READ); } keys.clear(); } } } catch (Exception e) { e.printStackTrace(); } } }
public class Client { public static void main(String[] args) { try (Scanner sc = new Scanner(System.in); Socket socket = new Socket("127.0.0.1", 44593);) { String input = sc.nextLine(); while (input != null && !"".equals(input.trim())) { socket.getOutputStream().write(input.getBytes()); input = sc.nextLine(); } } catch (Exception e) { e.printStackTrace(); } } }