在Java的NIO中,有三个比较重要的概念:Buffer、Channel和Selector。java
结合上一篇文章提到的送花的例子。Buffer对应花,Channel对应A和B与花之间的联系,Selector就是不断进行轮询的线程。服务器
Channel分为ServerSocketChannel和SocketChannel,是客户端与服务端进行通讯的通道。socket
ServerSocketChannel用户服务器端,职责就是监听客户端的链接请求。一旦经过容许,就会创建与该客户端对应的SocketChannel。一个服务端的一个端口只能创建一个ServerSocketChannel用来监听链接。ide
SocketChannel具备惟一性。一个客户端可能连接多个服务端,那就是多个SocketChannel。服务端与多个客户端创建的链接就有多个SocketChannel。学习
Selector是用来负责阻塞轮询的线程,能够经过其静态方法Seletor.open()建立。服务端建立后经过Channel的register方法注册到ServerSocketChannel上,等待客户端链接。客户端一样建立Seletor后经过Channel的register方法注册到SocketChannel上。ui
当客户端的SocketChannel指定服务端的port和ip进行connect请求以后,服务端的Selector就能够检测到客户端的connect请求。而后服务端accept表示继续监听下一个请求,同时能够继续在与客户端创建了SocketChannel上监听读写请求。客户端同理。this
Selector的做用就是监听SelectionKey.OP_ACCEPT(服务端专属)、SelectionKey.OP_CONNECT(客户端专属)、SelectionKey.OP_READ、SelectionKey.OP_Write四种注册的请求,一旦有请求被容许,就会调用相关的方法进行处理。.net
Buffer是用于在Channel中传递的数据。Buffer里有4个属性,来表示数据在Buffer中的存取状况:线程
这4个属性的大小关系是:mark<=position<=limit<=capacitycode
接下来经过一个客户端与服务端通讯的例子,来学习使用NIO。客户端每隔1秒向服务端发送请求,服务端响应并返回数据。
服务端:
package cn.testNio; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.util.Iterator; import java.util.Set; /** * @Description : TODO * @Author : houshuiqiang@163.com, 2017年10月2日 下午2:58:31 * @Modified :houshuiqiang@163.com, 2017年10月2日 */ public class NioDemoServer { public static void main(String[] args) { NioServer nioServer = new NioServer(8181); new Thread(nioServer, "nio-server-test").start(); } } class NioServer implements Runnable { private Selector selector; private ServerSocketChannel serverSocketChannel; private volatile boolean stop; public NioServer(int port){ stop = false; try { selector = Selector.open(); serverSocketChannel = ServerSocketChannel.open(); serverSocketChannel.configureBlocking(false); serverSocketChannel.socket().bind(new InetSocketAddress(port)); serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); } catch (IOException e) { e.printStackTrace(); System.exit(1); } } public void stop(){ this.stop = true; } @Override public void run(){ while (!stop){ try { selector.select(); // 阻塞等待 Set<SelectionKey> selectedKeys = selector.selectedKeys(); Iterator<SelectionKey> iterator = selectedKeys.iterator(); while(iterator.hasNext()){ SelectionKey selectionKey = iterator.next(); iterator.remove(); try{ handleKey(selectionKey); // 可能发生客户端失联的错误 }catch(IOException e){ e.printStackTrace(); if (selectionKey != null) { // 将发生异常的客户端关闭,不然会一直被selector轮询到 selectionKey.cancel(); if (selectionKey.channel() != null) { selectionKey.channel().close(); } } } } } catch (IOException e) { e.printStackTrace(); } } if (selector != null) { try { selector.close(); } catch (IOException e) { e.printStackTrace(); } } } private void handleKey(SelectionKey selectionKey) throws IOException { if (selectionKey.isValid()) { if (selectionKey.isAcceptable()) { ServerSocketChannel ssc = (ServerSocketChannel)selectionKey.channel(); SocketChannel socketChannel = ssc.accept(); socketChannel.configureBlocking(false); socketChannel.register(selector, SelectionKey.OP_READ); } if (selectionKey.isReadable()) { SocketChannel socketChannel = (SocketChannel)selectionKey.channel(); String body = getBodyFromSocketChannel(socketChannel); if (null == body) { // 断开链路 selectionKey.cancel(); selectionKey.channel().close(); }else if ("".equals(body)) { // 心跳检测 ,忽略 }else{ String resultBody = handleBody(socketChannel, body); write2Client(socketChannel, resultBody); } } } } private String getBodyFromSocketChannel(SocketChannel socketChannel) throws IOException{ ByteBuffer byteBuffer = ByteBuffer.allocate(1024); int byteBufferSize = socketChannel.read(byteBuffer); if (byteBufferSize == 0) { // 心跳检测,忽略 return ""; }else if (byteBufferSize > 0) { byteBuffer.flip(); byte[] array = new byte[byteBuffer.remaining()]; byteBuffer.get(array); return new String(array); }else{ return null; } } private String handleBody(SocketChannel socketChannel, String body) { String hostAddress = socketChannel.socket().getInetAddress().getHostAddress(); System.out.println("message from client : " + hostAddress + ", content: " + body); // 模拟请求处理 return "server received message: " + body; // 模拟返回处理结果 } private void write2Client(SocketChannel socketChannel, String resultBody) throws IOException { ByteBuffer byteBuffer = ByteBuffer.allocate(1024); // 真实场景每每比1024要大 byteBuffer.put(resultBody.getBytes()); byteBuffer.flip(); socketChannel.register(selector, SelectionKey.OP_READ); socketChannel.write(byteBuffer); } }
客户端:
package cn.testNio; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.SocketChannel; import java.util.Iterator; import java.util.Set; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; /** * @Description : TODO * @Author : houshuiqiang@163.com, 2017年10月2日 下午5:58:49 * @Modified :houshuiqiang@163.com, 2017年10月2日 */ public class NioDemoClient { public static void main(String[] args) throws InterruptedException { NioClient nioClient = new NioClient("192.168.10.47", 8181); new Thread(nioClient, "nio-client-test").start(); for (int i = 0; i < 10; i++) { nioClient.getQueue().offer("time" + i); Thread.sleep(1000); } nioClient.stop(); } } class NioClient implements Runnable { private Selector selector; private SocketChannel socketChannel; private String address; private int port; private volatile boolean stop; private LinkedBlockingQueue<String> queue; public NioClient(String address, int port){ this.address = address; this.port = port; this.stop = false; queue = new LinkedBlockingQueue<String>(); try { selector = Selector.open(); socketChannel = SocketChannel.open(); socketChannel.configureBlocking(false); } catch (IOException e) { e.printStackTrace(); System.exit(1); } } public BlockingQueue<String> getQueue(){ return queue; } public void stop(){ this.stop = true; } @Override public void run(){ doConnect(); while (!stop) { try { selector.select(); // 阻塞等待 Set<SelectionKey> selectedKeys = selector.selectedKeys(); Iterator<SelectionKey> iterator = selectedKeys.iterator(); while(iterator.hasNext()){ SelectionKey selectionKey = iterator.next(); iterator.remove(); try{ handleKey(selectionKey); }catch(IOException e){ e.printStackTrace(); if (selectionKey != null) { selectionKey.cancel(); if (selectionKey.channel() != null) { selectionKey.channel().close(); } } }catch(InterruptedException e){ e.printStackTrace(); break; } } } catch (IOException e) { e.printStackTrace(); } } try { socketChannel.close(); // 优雅关闭连接 selector.close(); // 直接selector.close()会关闭全部该seletor上的全部channel,可是服务器会接收到客户端强制关闭的错误信息。 } catch (IOException e) { e.printStackTrace(); } } private void doConnect() { try { socketChannel.register(selector, SelectionKey.OP_CONNECT); socketChannel.connect(new InetSocketAddress(address, port)); } catch (IOException e) { e.printStackTrace(); System.exit(1); } } private void handleKey(SelectionKey selectionKey) throws IOException, InterruptedException { if (selectionKey.isValid()) { SocketChannel socketChannel = (SocketChannel)selectionKey.channel(); if (selectionKey.isConnectable()) { if (socketChannel.finishConnect()) { socketChannel.register(selector, SelectionKey.OP_WRITE); } } if (selectionKey.isReadable()) { String resultBody = getBodyFromSocketChannel(socketChannel); if (null == resultBody) { // 断开链路 selectionKey.cancel(); selectionKey.channel().close(); }else if ("".equals(resultBody)) { // 心跳检测 ,忽略 }else{ System.out.println("received result : " + resultBody); socketChannel.register(selector, SelectionKey.OP_WRITE); } } if (selectionKey.isWritable()) { sendRequest(socketChannel); } } } private void sendRequest(SocketChannel socketChannel) throws IOException, InterruptedException { String requestBody = queue.poll(100, TimeUnit.MILLISECONDS); if (null != requestBody) { ByteBuffer byteBuffer = ByteBuffer.allocate(1024); byteBuffer.put(requestBody.getBytes()); byteBuffer.flip(); socketChannel.write(byteBuffer); socketChannel.register(selector, SelectionKey.OP_READ); if (! byteBuffer.hasRemaining()) { System.out.println("send request to server : " + requestBody); } }else { socketChannel.register(selector, SelectionKey.OP_WRITE); } } private String getBodyFromSocketChannel(SocketChannel socketChannel) throws IOException { ByteBuffer byteBuffer = ByteBuffer.allocate(1024); int byteBufferSize = socketChannel.read(byteBuffer); if (byteBufferSize == 0) { // 心跳检测,忽略 return ""; }else if (byteBufferSize > 0) { byteBuffer.flip(); byte[] array = new byte[byteBuffer.remaining()]; byteBuffer.get(array); return new String(array); }else{ return null; } } }