我所理解的NIO

在Java的NIO中,有三个比较重要的概念:Buffer、Channel和Selector。java

结合上一篇文章提到的送花的例子。Buffer对应花,Channel对应A和B与花之间的联系,Selector就是不断进行轮询的线程。服务器

Channel分为ServerSocketChannel和SocketChannel,是客户端与服务端进行通讯的通道。socket

ServerSocketChannel用户服务器端,职责就是监听客户端的链接请求。一旦经过容许,就会创建与该客户端对应的SocketChannel。一个服务端的一个端口只能创建一个ServerSocketChannel用来监听链接。ide

SocketChannel具备惟一性。一个客户端可能连接多个服务端,那就是多个SocketChannel。服务端与多个客户端创建的链接就有多个SocketChannel。学习

Selector是用来负责阻塞轮询的线程,能够经过其静态方法Seletor.open()建立。服务端建立后经过Channel的register方法注册到ServerSocketChannel上,等待客户端链接。客户端一样建立Seletor后经过Channel的register方法注册到SocketChannel上。ui

当客户端的SocketChannel指定服务端的port和ip进行connect请求以后,服务端的Selector就能够检测到客户端的connect请求。而后服务端accept表示继续监听下一个请求,同时能够继续在与客户端创建了SocketChannel上监听读写请求。客户端同理。this

Selector的做用就是监听SelectionKey.OP_ACCEPT(服务端专属)、SelectionKey.OP_CONNECT(客户端专属)、SelectionKey.OP_READ、SelectionKey.OP_Write四种注册的请求,一旦有请求被容许,就会调用相关的方法进行处理。.net

Buffer是用于在Channel中传递的数据。Buffer里有4个属性,来表示数据在Buffer中的存取状况:线程

  • capacity:容量。Buffer的最大存储量,建立时指定,使用过程当中不会改变
  • limit:上线。Buffer中已有数据的最大值,<=capacity
  • position:索引位置。position从0开始,随着get和put方法自动更新,用来记录实时数据的位置
  • mark:用来暂存position的值。mark后能够经过reset方法将mark的值恢复到position

这4个属性的大小关系是:mark<=position<=limit<=capacitycode

接下来经过一个客户端与服务端通讯的例子,来学习使用NIO。客户端每隔1秒向服务端发送请求,服务端响应并返回数据。

服务端:

package cn.testNio;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;

/** 
 * @Description : TODO
 * @Author : houshuiqiang@163.com, 2017年10月2日 下午2:58:31
 * @Modified :houshuiqiang@163.com, 2017年10月2日
 */
public class NioDemoServer {

    public static void main(String[] args) {
        NioServer nioServer = new NioServer(8181);
        new Thread(nioServer, "nio-server-test").start();
    }
}

class NioServer implements Runnable {
    
    private Selector selector;
    
    private ServerSocketChannel serverSocketChannel;
    
    private volatile boolean stop;
    
    public NioServer(int port){
        stop = false;
        try {
            selector = Selector.open();
            serverSocketChannel = ServerSocketChannel.open();
            serverSocketChannel.configureBlocking(false);
            serverSocketChannel.socket().bind(new InetSocketAddress(port));
            serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
        } catch (IOException e) {
            e.printStackTrace();
            System.exit(1);
        }
    }
    
    public void stop(){
        this.stop = true;
    }
    
    @Override
    public void run(){
        while (!stop){
            try {
                selector.select(); // 阻塞等待
                Set<SelectionKey> selectedKeys = selector.selectedKeys();
                Iterator<SelectionKey> iterator = selectedKeys.iterator();
                while(iterator.hasNext()){
                    SelectionKey selectionKey = iterator.next();
                    iterator.remove();
                    try{
                        handleKey(selectionKey); // 可能发生客户端失联的错误
                    }catch(IOException e){
                        e.printStackTrace();
                        if (selectionKey != null) { // 将发生异常的客户端关闭,不然会一直被selector轮询到
                            selectionKey.cancel();
                            if (selectionKey.channel() != null) {
                                selectionKey.channel().close();
                            }
                        }
                    }
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
        
        if (selector != null) {
            try {
                selector.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }


    private void handleKey(SelectionKey selectionKey) throws IOException {
        if (selectionKey.isValid()) {
            if (selectionKey.isAcceptable()) {
                ServerSocketChannel ssc = (ServerSocketChannel)selectionKey.channel();
                SocketChannel socketChannel = ssc.accept();
                socketChannel.configureBlocking(false);
                socketChannel.register(selector, SelectionKey.OP_READ);
            }
            if (selectionKey.isReadable()) {
                SocketChannel socketChannel = (SocketChannel)selectionKey.channel();
                String body = getBodyFromSocketChannel(socketChannel);
                if (null == body) {
                    // 断开链路
                    selectionKey.cancel();
                    selectionKey.channel().close();
                }else if ("".equals(body)) {
                    // 心跳检测 ,忽略
                }else{
                    String resultBody = handleBody(socketChannel, body);
                    write2Client(socketChannel, resultBody);
                }
            }
        }
        
    }


    private String getBodyFromSocketChannel(SocketChannel socketChannel) throws IOException{
        ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
        int byteBufferSize = socketChannel.read(byteBuffer);
        if (byteBufferSize == 0) { // 心跳检测,忽略
            return "";
        }else if (byteBufferSize > 0) {
            byteBuffer.flip();
            byte[] array = new byte[byteBuffer.remaining()];
            byteBuffer.get(array);
            return new String(array);
        }else{
            return null;
        }
    }
    
    private String handleBody(SocketChannel socketChannel, String body) {
        String hostAddress = socketChannel.socket().getInetAddress().getHostAddress();
        
        System.out.println("message from client : " + hostAddress + ", content: " + body); // 模拟请求处理
        
        return "server received message: " + body; // 模拟返回处理结果
    }
    
    private void write2Client(SocketChannel socketChannel, String resultBody) throws IOException {
        ByteBuffer byteBuffer = ByteBuffer.allocate(1024); // 真实场景每每比1024要大
        byteBuffer.put(resultBody.getBytes());
        byteBuffer.flip();
        socketChannel.register(selector, SelectionKey.OP_READ);
        socketChannel.write(byteBuffer);
    }
    
}

客户端:

package cn.testNio;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;

/** 
 * @Description : TODO
 * @Author : houshuiqiang@163.com, 2017年10月2日 下午5:58:49
 * @Modified :houshuiqiang@163.com, 2017年10月2日
 */
public class NioDemoClient {

    public static void main(String[] args) throws InterruptedException {
        NioClient nioClient = new NioClient("192.168.10.47", 8181);
        new Thread(nioClient, "nio-client-test").start();
        
        for (int i = 0; i < 10; i++) {
            nioClient.getQueue().offer("time" + i);
            Thread.sleep(1000);
        }
        nioClient.stop();
    }
}

class NioClient implements Runnable {
    private Selector selector;
    
    private SocketChannel socketChannel;
    
    private String address;
    
    private int port;
    
    private volatile boolean stop;
    
    private LinkedBlockingQueue<String> queue;
    
    public NioClient(String address, int port){
        this.address = address;
        this.port = port;
        this.stop = false;
        queue = new LinkedBlockingQueue<String>();
        try {
            selector = Selector.open();
            socketChannel = SocketChannel.open();
            socketChannel.configureBlocking(false);
        } catch (IOException e) {
            e.printStackTrace();
            System.exit(1);
        }
    }
    
    public BlockingQueue<String> getQueue(){
        return queue;
    }
    
    public void stop(){
        this.stop = true;
    }
    
    @Override
    public void run(){
        
        doConnect();
        
        while (!stop) {
            try {
                selector.select(); // 阻塞等待
                
                Set<SelectionKey> selectedKeys = selector.selectedKeys();
                Iterator<SelectionKey> iterator = selectedKeys.iterator();
                
                while(iterator.hasNext()){
                    SelectionKey selectionKey = iterator.next();
                    iterator.remove();
                    try{
                        handleKey(selectionKey); 
                    }catch(IOException e){
                        e.printStackTrace();
                        if (selectionKey != null) {
                            selectionKey.cancel();
                            if (selectionKey.channel() != null) {
                                selectionKey.channel().close();
                            }
                        }
                    }catch(InterruptedException e){
                        e.printStackTrace();
                        break;
                    }
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
        
        try {
            socketChannel.close(); // 优雅关闭连接
            selector.close(); // 直接selector.close()会关闭全部该seletor上的全部channel,可是服务器会接收到客户端强制关闭的错误信息。
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    private void doConnect() {
        try {
            socketChannel.register(selector, SelectionKey.OP_CONNECT);
            socketChannel.connect(new InetSocketAddress(address, port));
        } catch (IOException e) {
            e.printStackTrace();
            System.exit(1);
        }
    }
    
    private void handleKey(SelectionKey selectionKey) throws IOException, InterruptedException {
        if (selectionKey.isValid()) {
            SocketChannel socketChannel = (SocketChannel)selectionKey.channel();
            if (selectionKey.isConnectable()) {
                if (socketChannel.finishConnect()) {
                    socketChannel.register(selector, SelectionKey.OP_WRITE);
                }
            }
            if (selectionKey.isReadable()) {
                String resultBody = getBodyFromSocketChannel(socketChannel);
                if (null == resultBody) {
                    // 断开链路
                    selectionKey.cancel();
                    selectionKey.channel().close();
                }else if ("".equals(resultBody)) {
                    // 心跳检测 ,忽略
                }else{
                    System.out.println("received result : " + resultBody);
                    socketChannel.register(selector, SelectionKey.OP_WRITE);
                }
            }
            if (selectionKey.isWritable()) {
                sendRequest(socketChannel);
            }
        }
    }

    private void sendRequest(SocketChannel socketChannel) throws IOException, InterruptedException {

        String requestBody = queue.poll(100, TimeUnit.MILLISECONDS);
        if (null != requestBody) {
            ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
            byteBuffer.put(requestBody.getBytes());
            byteBuffer.flip();
            socketChannel.write(byteBuffer);
            socketChannel.register(selector, SelectionKey.OP_READ);
            
            if (! byteBuffer.hasRemaining()) {
                System.out.println("send request to server : " + requestBody);
            }
        }else {
            socketChannel.register(selector, SelectionKey.OP_WRITE);
        }
    }
    
    private String getBodyFromSocketChannel(SocketChannel socketChannel) throws IOException {
        ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
        int byteBufferSize = socketChannel.read(byteBuffer);
        if (byteBufferSize == 0) { // 心跳检测,忽略
            return "";
        }else if (byteBufferSize > 0) {
            byteBuffer.flip();
            byte[] array = new byte[byteBuffer.remaining()];
            byteBuffer.get(array);
            return new String(array);
        }else{
            return null;
        }
    }
    
}
相关文章
相关标签/搜索