NIO究竟是什么的简称?有人喜欢称之为New IO,由于它相对于之前的IO是新增的,因此官方称之为New IO。可是,因为以前的IO类库是阻塞的,New IO就是要让Java可以支持非阻塞IO,因此,也有人喜欢称之为Non-block IO。 java
Buffer 是一个对象, 它包含一些要写入或者刚读出的数据。 在 NIO 中加入 Buffer 对象,体现了新库与原 I/O 的一个重要区别。在面向流的 I/O 中,您将数据直接写入或者将数据直接读到 Stream 对象中。
在 NIO 库中,全部数据都是用缓冲区处理的。在读取数据时,它是直接读到缓冲区中的。在写入数据时,它是写入到缓冲区中的。任什么时候候访问 NIO 中的数据,您都是将它放到缓冲区中。
缓冲区实质上是一个数组。一般它是一个字节数组,可是也可使用其余种类的数组。可是一个缓冲区不 仅仅 是一个数组。缓冲区提供了对数据的结构化访问,并且还能够跟踪系统的读/写进程。
最经常使用的缓冲区类型是 ByteBuffer。一个 ByteBuffer 能够在其底层字节数组上进行 get/set 操做(即字节的获取和设置)。react
Channel是一个通道,能够经过它读取与写入数据,它就像自来水管同样,网络数据经过Channel读取和写入。通道与流的不一样之处在于通道是双向的,流只是在一个方向上移动(一个流必须是 InputStream 或者 OutputStream 的子类),并且通道能够用于读、写或者同时用于读写。
由于Channel是全双工的,因此它能够比流更好的映射底层操做系统的API。特别是在UNIX网络编程模型中,底层操做系统的通道都是全双工的,同时支持读写操做。编程
多路复用器提供选择已经就绪的任务的能力。简单来说,Selector会不断地轮询注册在其上的Channel,若是某个channel上有新的TCP链接接入、读和写事件,这个Channel就处于就绪状态,会被Selector轮询出来,而后经过SelectionKey能够获取就绪Channel的集合,进行后续的I/O操做。
一个多路复用器Selector能够同时轮询多个Channel,因为JDK使用epool()代替传统的select实现,因此它并无最大连续句柄1024/2048的限制。这也就意味着只须要一个线程负责Selector的轮询,就能够接入成千上万的客户端。数组
ServerSocketChannel acceptor = ServerSocketChannel.open();
int port = 8080; acceptor.socket().bind(new InetSocketAddress(InetAddress.getByName("IP"),port)); acceptor.configureBlocking(false);
Selector selector = Selector.open(); new Thread(new ReactorTask()).start();
SelectionKey key = acceptor.register(selector,SelectionKey.OP_ACCEPT,ioHandler);
int num = selector.select(); Set<SelectionKey> selectedKeys = selector.selectedKeys(); Iterator<SelectionKey> keys = selectedKeys.iterator(); while(keys.hasNext()){ SelectionKey key = keys.next(); //doWhat }
SocketChannel sc = serverChannel.accept();
sc.configureBlocking(false); sc.socket().setReuseAddress(true);
SelectionKey key = sc.register(selector,SelectionKey.OP_READ);
int number = sc.read(receivedBuffer);
while(buffer.hasRemain){ writeBuffer(); }
/** * used to test nio * Created by spark on 10/14/16. */ public class MultiplexerTimeServer implements Runnable { private Selector selector; private ServerSocketChannel serverSocketChannel; private volatile boolean stop; /** * 初始化多路复用器,绑定监听端口 * @param port */ public MultiplexerTimeServer(int port) { try { selector = Selector.open(); serverSocketChannel = ServerSocketChannel.open(); //设为异步非阻塞 serverSocketChannel.configureBlocking(false); //backlog设为1024 serverSocketChannel.socket().bind(new InetSocketAddress(port), 1024); serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); System.out.println("The time server is start in port:" + port); } catch (IOException e) { e.printStackTrace(); System.exit(1); } } public void stop() { this.stop = true; } /** * 根据key的操做位获取网络事件的类型 TCP三次握手过程 * @param key * @throws IOException */ private void handleInput(SelectionKey key) throws IOException { if (key.isValid()) { if (key.isAcceptable()) { ServerSocketChannel ssc = (ServerSocketChannel) key.channel(); SocketChannel sc = ssc.accept(); sc.configureBlocking(false); sc.register(selector, SelectionKey.OP_READ); } if(key.isReadable()){ SocketChannel sc = (SocketChannel) key.channel(); //经过ByteBuffer读取客户端的请求信息 开辟1K的缓冲区 ByteBuffer readBuffer = ByteBuffer.allocate(1024); int readBytes = sc.read(readBuffer); if(readBytes > 0){ readBuffer.flip(); byte[] bytes = new byte[readBuffer.remaining()]; readBuffer.get(bytes); String body = new String(bytes,"UTF-8"); System.out.println("The time server received order : " + body ); String currentTime = "QUERY TIME ORDER".equalsIgnoreCase(body) ? new Date(System.currentTimeMillis()).toString() : "BAD ORDER"; doWrite(sc,currentTime); }else if(readBytes < 0){ key.cancel(); sc.close(); }else{ } } } } /** * 经过ByteBuffer将应答消息异步发送给客户端 * @param socketChannel * @param response * @throws IOException */ private void doWrite(SocketChannel socketChannel,String response) throws IOException { if(response != null && response.trim().length() > 0){ byte[] bytes = response.getBytes(); ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length); writeBuffer.put(bytes); writeBuffer.flip(); socketChannel.write(writeBuffer); } } @Override public void run() { //遍历selector,间隔为1s while (!stop) { try { selector.select(1000); Set<SelectionKey> selectedKeys = selector.selectedKeys(); Iterator<SelectionKey> it = selectedKeys.iterator(); SelectionKey key = null; //有就绪状态的Channel时,selector返回就绪状态的Channel的SelectionKey集合,经过对就绪状态的Channel集合进行迭代,进行异步读写操做 while (it.hasNext()) { key = it.next(); it.remove(); try { handleInput(key); } catch (IOException e) { if(key != null){ key.cancel(); if(key.channel() != null){ key.channel().close(); } } } } } catch (IOException e) { e.printStackTrace(); } } } }
public class TimeServer { public static void main(String[] args) { int port = 8080; if(args != null && args.length > 0){ try { port = Integer.valueOf(args[0]); } catch (NumberFormatException e) { e.printStackTrace(); } } MultiplexerTimeServer timeServer = new MultiplexerTimeServer(port); new Thread(timeServer,"NIO-MultiplexerTimeServer-001").start(); } }
SocketChannel clientChannel = SocketChannel.open();
clientChannel.configureBlocking(false); clientChannel.socket().setReuseAddress(true); clientChannel.socket().setReceiveBufferSize(BUFFER_SIZE); clientChannel.socket().setSendBufferSize(BUFFER_SIZE);
boolean connected = clientChannel.connect(new InetSocketAddress("ip",port));
if(connected){ clientChannel.register(selector,SelectionKey.OP_READ,ioHandler); }else{ clientChannel.register(selector,SelectionKey.OP_CONNECT,ioHandler); }
Selector selector = Selector.open(); new Thread(new ReactorTask()).start();
int num = selector.select(); Set<SelectionKey> selectedKeys = selector.selectedKeys(); Iterator<SelectionKey> keys = selectedKeys.iterator(); while(keys.hasNext()){ SelectionKey key = keys.next(); }
if(key.isConnectable()){ //handleConnect }
if(clientChannel.finishConnect()) registerRead();
int number = sc.read(receivedBuffer); while(buffer.hasRemain){ }
public class TimeClientHandler implements Runnable { private String host; private int port; private Selector selector; private SocketChannel socketChannel; private volatile boolean stop; public TimeClientHandler(int port, String host) { this.port = port; this.host = host == null ? "127.0.0.1" : host; try { selector = Selector.open(); socketChannel = SocketChannel.open(); socketChannel.configureBlocking(false); } catch (IOException e) { e.printStackTrace(); System.exit(1); } } private void handleInput(SelectionKey key) throws IOException { if(key.isValid()){ SocketChannel sc = (SocketChannel) key.channel(); if(key.isConnectable()){ if(sc.finishConnect()){ sc.register(selector,SelectionKey.OP_READ); doWrite(sc); }else{ System.exit(1); } } if(key.isReadable()){ ByteBuffer readBuffer = ByteBuffer.allocate(1024); int readBytes = sc.read(readBuffer); if(readBytes > 0) { readBuffer.flip(); byte[] bytes = new byte[readBuffer.remaining()]; readBuffer.get(bytes); String body = new String(bytes, "UTF-8"); System.out.println("Now is " + body); this.stop = true; }else if(readBytes < 0){ key.cancel(); sc.close(); }else{ } } } } private void doWrite(SocketChannel sc) throws IOException { byte[] req = "QUERY TIME ORDER".getBytes(); ByteBuffer writeBuffer = ByteBuffer.allocate(req.length); writeBuffer.put(req); writeBuffer.flip(); sc.write(writeBuffer); if(!writeBuffer.hasRemaining()){ System.out.println("Send order 2 server succeed."); } } private void doConnect() throws IOException { if(socketChannel.connect(new InetSocketAddress(host,port))){ socketChannel.register(selector,SelectionKey.OP_READ); doWrite(socketChannel); }else{ socketChannel.register(selector,SelectionKey.OP_CONNECT); } } @Override public void run() { try { doConnect(); } catch (IOException e) { e.printStackTrace(); System.exit(1); } while (!stop) { try { selector.select(1000); Set<SelectionKey> selectedKeys = selector.selectedKeys(); Iterator<SelectionKey> it = selectedKeys.iterator(); SelectionKey key = null; while (it.hasNext()) { key = it.next(); it.remove(); try { handleInput(key); } catch (IOException e) { if(key != null){ key.cancel(); if(key.channel() != null){ key.channel().close(); } } } } } catch (IOException e) { e.printStackTrace(); System.exit(1); } } if(selector != null){ try { selector.close(); } catch (IOException e) { e.printStackTrace(); } } } }
public class TimeClient { public static void main(String[] args) { int port = 8080; if(args != null && args.length > 0){ try { port = Integer.valueOf(args[0]); } catch (NumberFormatException e) { e.printStackTrace(); } } new Thread(new TimeClientHandler(port,"127.0.0.1"),"TimeClient-001").start(); } }
NIO2.0引入了新的异步通道概念,并提供了异步文件通道和异步套接字通道的实现。
异步通道提供两种方式获取操做结果网络
- 经过java.util.concurrent.Feature类来表示异步操做的结果;
- 在执行异步操做的时候传入一个java.nio.channels。
CompletionHandler接口的实现类做为操做完成的回调。
public class AsyncTimeServerHandler implements Runnable { private int port; CountDownLatch latch; AsynchronousServerSocketChannel asynchronousServerSocketChannel; public AsyncTimeServerHandler(int port) { this.port = port; try { asynchronousServerSocketChannel = AsynchronousServerSocketChannel.open(); // 建立一个异步服务端通道。 asynchronousServerSocketChannel.bind(new InetSocketAddress(port));// bind 一个监听端口 System.out.println("The time server is start in port : " + port); } catch (IOException e) { e.printStackTrace(); } } @Override public void run() { latch = new CountDownLatch(1); // 在完成一组正在执行的操做以前,容许当前的线程一直阻塞。 doAccept(); try { latch.await(); } catch (InterruptedException e) { e.printStackTrace(); } } public void doAccept() { asynchronousServerSocketChannel.accept(this, new AcceptCompletionHandler());// 处理接受消息的通知。 } }
public class AcceptCompletionHandler implements CompletionHandler<AsynchronousSocketChannel,AsyncTimeServerHandler> { @Override public void completed(AsynchronousSocketChannel result, AsyncTimeServerHandler attachment) { attachment.asynchronousServerSocketChannel.accept(attachment, this); ByteBuffer buffer = ByteBuffer.allocate(1024); result.read(buffer, buffer, new ReadCompletionHandler(result)); } @Override public void failed(Throwable exc, AsyncTimeServerHandler attachment) { exc.printStackTrace(); attachment.latch.countDown(); } }
public class ReadCompletionHandler implements CompletionHandler<Integer, ByteBuffer> { private AsynchronousSocketChannel channel; public ReadCompletionHandler(AsynchronousSocketChannel channel) { if (this.channel == null) this.channel = channel; } @Override public void completed(Integer result, ByteBuffer attachment) { attachment.flip(); byte[] body = new byte[attachment.remaining()]; attachment.get(body); try { String req = new String(body, "UTF-8"); System.out.println("The time server receive order : " + req); String currentTime = "QUERY TIME ORDER".equalsIgnoreCase(req) ? new java.util.Date( System.currentTimeMillis()).toString() : "BAD ORDER"; doWrite(currentTime); } catch (UnsupportedEncodingException e) { e.printStackTrace(); } } @Override public void failed(Throwable exc, ByteBuffer attachment) { try { this.channel.close(); } catch (IOException e) { e.printStackTrace(); } } private void doWrite(String currentTime) { if (currentTime != null && currentTime.trim().length() > 0) { byte[] bytes = (currentTime).getBytes(); ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length); writeBuffer.put(bytes); writeBuffer.flip(); channel.write(writeBuffer, writeBuffer, new CompletionHandler<Integer, ByteBuffer>() { @Override public void completed(Integer result, ByteBuffer buffer) { // 若是没有发送完成,继续发送 if (buffer.hasRemaining()) channel.write(buffer, buffer, this); } @Override public void failed(Throwable exc, ByteBuffer attachment) { try { channel.close(); } catch (IOException e) { // ingnore on close } } }); } } }
public class TimeServer { public static void main(String[] args) throws IOException { int port = 8080; if (args != null && args.length > 0) { try { port = Integer.valueOf(args[0]); } catch (NumberFormatException e) { // 采用默认值 } } AsyncTimeServerHandler timeServer = new AsyncTimeServerHandler(port); new Thread(timeServer, "AIO-AsyncTimeServerHandler-001").start(); } }
public class AsyncTimeClientHandler implements CompletionHandler<Void, AsyncTimeClientHandler>, Runnable{ private AsynchronousSocketChannel client; private String host; private int port; private CountDownLatch latch; public AsyncTimeClientHandler(String host, int port) { this.host = host; this.port = port; try { client = AsynchronousSocketChannel.open(); } catch (IOException e) { e.printStackTrace(); } } @Override public void completed(Void result, AsyncTimeClientHandler attachment) { byte[] req = "QUERY TIME ORDER".getBytes(); ByteBuffer writeBuffer = ByteBuffer.allocate(req.length); writeBuffer.put(req); writeBuffer.flip(); client.write(writeBuffer, writeBuffer, new CompletionHandler<Integer, ByteBuffer>() { @Override public void completed(Integer result, ByteBuffer buffer) { if (buffer.hasRemaining()) { client.write(buffer, buffer, this); } else { ByteBuffer readBuffer = ByteBuffer.allocate(1024); client.read( readBuffer, readBuffer, new CompletionHandler<Integer, ByteBuffer>() { @Override public void completed(Integer result, ByteBuffer buffer) { buffer.flip(); byte[] bytes = new byte[buffer.remaining()]; buffer.get(bytes); String body; try { body = new String(bytes, "UTF-8"); System.out.println("Now is : " + body); latch.countDown(); } catch (UnsupportedEncodingException e) { e.printStackTrace(); } } @Override public void failed(Throwable exc, ByteBuffer attachment) { try { client.close(); latch.countDown(); } catch (IOException e) { // ingnore on close } } }); } } @Override public void failed(Throwable exc, ByteBuffer attachment) { try { client.close(); latch.countDown(); } catch (IOException e) { // ingnore on close } } }); } @Override public void failed(Throwable exc, AsyncTimeClientHandler attachment) { exc.printStackTrace(); try { client.close(); latch.countDown(); } catch (IOException e) { e.printStackTrace(); } } @Override public void run() { latch = new CountDownLatch(1); client.connect(new InetSocketAddress(host, port), this, this); try { latch.await(); } catch (InterruptedException e1) { e1.printStackTrace(); } try { client.close(); } catch (IOException e) { e.printStackTrace(); } } }
public class TimeClient { public static void main(String[] args) { int port = 8080; if (args != null && args.length > 0) { try { port = Integer.valueOf(args[0]); } catch (NumberFormatException e) { // 采用默认值 } } new Thread(new AsyncTimeClientHandler("127.0.0.1", port), "AIO-AsyncTimeClientHandler-001").start(); } }