网络通信编程 (一)BIO、NIO、AIO

1.BIO

所谓的 BIO 意思其实是 Blocking I/O,也就是所谓的阻塞式 I/O,那么阻塞式 I/O 到底是什么样的呢?为什么会阻塞呢?接下来以一段基于 Socket通信 的 Client/Server 示例来阐述这个问题。

先是 Client 端:客户端很简单,就是将键盘输入的字符串发送给服务器端

public class BIOClient {
	private static Charset charset = Charset.forName("UTF-8");

	public static void main(String[] args) throws Exception {
		Socket s = new Socket("localhost", 8080);
		OutputStream out = s.getOutputStream();

		Scanner scanner = new Scanner(System.in);
		System.out.println("请输入:");
		String msg = scanner.nextLine();
		out.write(msg.getBytes(charset)); // 阻塞,写完成
		scanner.close();
		s.close();
	}

}

再就是 Server 端:服务器端是跟客户端建立Socket连接,然后读取客户端发送的数据并且将数据打印到 Console 上。

public class BIOServer {

    public static void main(String[] args) throws Exception {
        ServerSocket serverSocket = new ServerSocket(8080);
        System.out.println("服务器启动成功");
        while (!serverSocket.isClosed()) {
            Socket request = serverSocket.accept();// 阻塞
            System.out.println("收到新连接 : " + request.toString());
            try {
                // 接收数据、打印
                InputStream inputStream = request.getInputStream(); // net + i/o
                BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream, "utf-8"));
                String msg;
                while ((msg = reader.readLine()) != null) { // 没有数据,阻塞
                    if (msg.length() == 0) {
                        break;
                    }
                    System.out.println(msg);
                }
                System.out.println("收到数据,来自:"+ request.toString());
            } catch (IOException e) {
                e.printStackTrace();
            } finally {
                try {
                    request.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
        serverSocket.close();
    }
}

以上客户端和服务器端确实可以实现通信,但是同时一个弊端也显现出来:服务器端每次只能跟一个客户端进行交互(因为输入输出流的 read,write 方法是阻塞的),而且服务器端在没有接受到请求的时候不能做任何事(因为 socket 的 accept 方法是阻塞的),这很明显不符合我们的预期即网络通信。

接下来我们将服务器端进行改良:想要与多个客户端进行通信,那么我们只需要利用到前面学到的多线程的知识,将连接和数据的读取分开,也就是在与一个客户端建立连接后,将读取数据的任务放到线程池中,从而不会阻塞当前主线程的执行。

// 多线程支持
public class BIOServer1 {
    private static ExecutorService threadPool = Executors.newCachedThreadPool();

    public static void main(String[] args) throws Exception {
        ServerSocket serverSocket = new ServerSocket(8080);
        System.out.println("tomcat 服务器启动成功");
        while (!serverSocket.isClosed()) {
            Socket request = serverSocket.accept();
            System.out.println("收到新连接 : " + request.toString());
            threadPool.execute(() -> {
                try {
                    // 接收数据、打印
                    InputStream inputStream = request.getInputStream();
                    BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream, "utf-8"));
                    String msg;
                    while ((msg = reader.readLine()) != null) { // 阻塞
                        if (msg.length() == 0) {
                            break;
                        }
                        System.out.println(msg);
                    }
                    System.out.println("收到数据,来自:"+ request.toString());
                } catch (IOException e) {
                    e.printStackTrace();
                } finally {
                    try {
                        request.close();
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
            });
        }
        serverSocket.close();
    }
}

 以上确实可以解决同时与多个客户端建立连接并通信的过程,但是为每个请求都建立一个线程未免也太消耗系统资源了,而且线程池中线程的切换也是有性能损耗的,另外还是没有解决服务器在接收到请求连接之前必须阻塞等待连接的问题。

2.NIO

NIO是New I/O的简称,与旧式的基于流的I/O方法相对,从名字看,它表示新的一套Java I/O标 准。它是在Java 1.4中被纳入到JDK中的,并具有以下特性: 

  • NIO是基于块(Block)的,它以块为基本单位处理数据 (硬盘上存储的单位也是按Block来存储,这样性能上比基于流的方式要好一些)
  • 为所有的原始类型提供(Buffer)缓存支持 
  • 增加通道(Channel)对象,作为新的原始 I/O 抽象
  • 支持锁(我们在平时使用时经常能看到会出现一些.lock的文件,这说明有线程正在使用这把锁,当线程释放锁时,会把这个文件删除掉,这样其他线程才能继续拿到这把锁)和内存映射文件的文件访问接口 
  • 提供了基于Selector的异步网络I/O 

所有的从通道中的读写操作,都要经过Buffer,而通道就是io的抽象,通道的另一端就是操纵的文件。

Java NIO 由以下几个核心部分组成:

  • Channel
  • Buffer
  • Selector

我们先来使用 NIO 中提到的 Channel,Buffer,Selector 继续改进 BIO 中的服务器端程序。

首先,将使用 NIO 方式的客户端代码提供出来,接着我们来做服务器端的改进。

public class NIOClient {

    public static void main(String[] args) throws Exception {
        SocketChannel socketChannel = SocketChannel.open();
        socketChannel.configureBlocking(false);
        socketChannel.connect(new InetSocketAddress("127.0.0.1", 8080));
        while (!socketChannel.finishConnect()) {
            // 没连接上,则一直等待
            Thread.yield();
        }
        Scanner scanner = new Scanner(System.in);
        System.out.println("请输入:");
        // 发送内容
        String msg = scanner.nextLine();
        ByteBuffer buffer = ByteBuffer.wrap(msg.getBytes());
        while (buffer.hasRemaining()) {
            socketChannel.write(buffer);
        }
        // 读取响应
        System.out.println("收到服务端响应:");
        ByteBuffer requestBuffer = ByteBuffer.allocate(1024);

        while (socketChannel.isOpen() && socketChannel.read(requestBuffer) != -1) {
            // 长连接情况下,需要手动判断数据有没有读取结束 (此处做一个简单的判断: 超过0字节就认为请求结束了)
            if (requestBuffer.position() > 0) break;
        }
        requestBuffer.flip();
        byte[] content = new byte[requestBuffer.limit()];
        requestBuffer.get(content);
        System.out.println(new String(content));
        scanner.close();
        socketChannel.close();
    }

}

 

服务器端第一次改进:使用到 Channel 和 Buffer 组件,默认的 Socket 通信都是阻塞的,因此即使使用 NIO 中的 channel,如果没有设置使用非阻塞的方式,那么读取连接请求还是会阻塞;数据的读取部分是一个 while 循环,这也算是自己实现的一个阻塞。在这一版的实现中,仍旧无法同时处理多个客户端的请求。

/**
 * 直接基于非阻塞的写法
 */
public class NIOServer {

    public static void main(String[] args) throws Exception {
        // 创建网络服务端
        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
        serverSocketChannel.configureBlocking(false); // 设置为非阻塞模式
        serverSocketChannel.socket().bind(new InetSocketAddress(8080)); // 绑定端口
        System.out.println("启动成功");
        while (true) {
            SocketChannel socketChannel = serverSocketChannel.accept(); // 获取新tcp连接通道
            // tcp请求 读取/响应
            if (socketChannel != null) {
                System.out.println("收到新连接 : " + socketChannel.getRemoteAddress());
                socketChannel.configureBlocking(false); // 默认是阻塞的,一定要设置为非阻塞
                try {
                    ByteBuffer requestBuffer = ByteBuffer.allocate(1024);
                    while (socketChannel.isOpen() && socketChannel.read(requestBuffer) != -1) {
                        // 长连接情况下,需要手动判断数据有没有读取结束 (此处做一个简单的判断: 超过0字节就认为请求结束了)
                        if (requestBuffer.position() > 0) break;
                    }
                    if(requestBuffer.position() == 0) continue; // 如果没数据了, 则不继续后面的处理
                    requestBuffer.flip();
                    byte[] content = new byte[requestBuffer.limit()];
                    requestBuffer.get(content);
                    System.out.println(new String(content));
                    System.out.println("收到数据,来自:"+ socketChannel.getRemoteAddress());

                    // 响应结果 200
                    String response = "HTTP/1.1 200 OK\r\n" +
                            "Content-Length: 11\r\n\r\n" +
                            "Hello World";
                    ByteBuffer buffer = ByteBuffer.wrap(response.getBytes());
                    while (buffer.hasRemaining()) {
                        socketChannel.write(buffer);// 非阻塞
                    }
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
        // 用到了非阻塞的API, 在设计上,和BIO可以有很大的不同.继续改进
    }
}

服务器端第二次改进:依旧使用 NIO 中的 Channel 和 ByteBuffer,通过一个线程轮询所有的通道以确定数据是否准备好了来确定是否要进行数据读取的操作。服务器端在收到连接请求的时候,先将连接请求加入到一个 ArrayList 的集合对象 channels 中(后面用来做轮询),同时也不断的接收新的连接,当没有连接的时候就去轮询 channels 中的 Channel 对象,如果有 Channel 中可以读到 ByteBuffer 的数据,那么就处理这个通道的请求。虽然可以通过一个线程处理多个连接,但是轮询的方式效率太低,而且一直只有一个线程来处理数据读取和业务执行,实在太浪费 CPU。

/**
 * 直接基于非阻塞的写法,一个线程处理轮询所有请求
 */
public class NIOServer1 {
    /**
     * 已经建立连接的集合
     */
    private static ArrayList<SocketChannel> channels = new ArrayList<>();

    public static void main(String[] args) throws Exception {
        // 创建网络服务端
        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
        serverSocketChannel.configureBlocking(false); // 设置为非阻塞模式
        serverSocketChannel.socket().bind(new InetSocketAddress(8080)); // 绑定端口
        System.out.println("启动成功");
        while (true) {
            SocketChannel socketChannel = serverSocketChannel.accept(); // 获取新tcp连接通道
                // tcp请求 读取/响应
                if (socketChannel != null) {
                System.out.println("收到新连接 : " + socketChannel.getRemoteAddress());
                socketChannel.configureBlocking(false); // 默认是阻塞的,一定要设置为非阻塞
                channels.add(socketChannel);
            } else {
                // 没有新连接的情况下,就去处理现有连接的数据,处理完的就删除掉
                Iterator<SocketChannel> iterator = channels.iterator();
                while (iterator.hasNext()) {
                    SocketChannel ch = iterator.next();
                    try {
                        ByteBuffer requestBuffer = ByteBuffer.allocate(1024);

                        if (ch.read(requestBuffer) == 0) {
                            // 等于0,代表这个通道没有数据需要处理,那就待会再处理
                            continue;
                        }
                        while (ch.isOpen() && ch.read(requestBuffer) != -1) {
                            // 长连接情况下,需要手动判断数据有没有读取结束 (此处做一个简单的判断: 超过0字节就认为请求结束了)
                            if (requestBuffer.position() > 0) break;
                        }
                        if(requestBuffer.position() == 0) continue; // 如果没数据了, 则不继续后面的处理
                        requestBuffer.flip();
                        byte[] content = new byte[requestBuffer.limit()];
                        requestBuffer.get(content);
                        System.out.println(new String(content));
                        System.out.println("收到数据,来自:" + ch.getRemoteAddress());

                        // 响应结果 200
                        String response = "HTTP/1.1 200 OK\r\n" +
                                "Content-Length: 11\r\n\r\n" +
                                "Hello World";
                        ByteBuffer buffer = ByteBuffer.wrap(response.getBytes());
                        while (buffer.hasRemaining()) {
                            ch.write(buffer);
                        }
                        iterator.remove();
                    } catch (IOException e) {
                        e.printStackTrace();
                        iterator.remove();
                    }
                }
            }
        }
        // 用到了非阻塞的API, 再设计上,和BIO可以有很大的不同
        // 问题: 轮询通道的方式,低效,浪费CPU
    }
}

服务器端第三次改进:这次改进用到了 NIO 的第三个利器,也就是 Selector 组件。Selector 的工作模式是基于事件通知的方式,例如服务器端的 Channel 对象在初始化完之后就将自己感兴趣的 OP_ACCEPT 即读取连接请求的事件注册到 Selector 上面;还有启动服务器之后,当有客户端与客户端建立连接请求,就会获得一个客户端 Channel 对象(同样也要设置成非阻塞的),因为之后需要跟客户端有数据通信,因此客户端 Channel 对象也同样要将自己感兴趣的 OP_READ 即读取数据的事件注册到 Selector 上面,当真的有数据发送过来的时候触发。这次改进使得原先写的轮询机制变成这种事件通知的机制,在执行效率上会更高效,但是仍旧不能处理大量请求到来的情况,因此我们还是要结合多线程的知识来实现并发处理。

/**
 * 结合Selector实现的非阻塞服务端(放弃对channel的轮询,借助消息通知机制)
 */
public class NIOServerV2 {

    public static void main(String[] args) throws Exception {
        // 1. 创建网络服务端ServerSocketChannel
        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
        serverSocketChannel.configureBlocking(false); // 设置为非阻塞模式

        // 2. 构建一个Selector选择器,并且将channel注册上去
        Selector selector = Selector.open();
        SelectionKey selectionKey = serverSocketChannel.register(selector, 0, serverSocketChannel);// 将serverSocketChannel注册到selector
        selectionKey.interestOps(SelectionKey.OP_ACCEPT); // 对serverSocketChannel上面的accept事件感兴趣(serverSocketChannel只能支持accept操作)

        // 3. 绑定端口
        serverSocketChannel.socket().bind(new InetSocketAddress(8080));

        System.out.println("启动成功");

        while (true) {
            // 不再轮询通道,改用下面轮询事件的方式.select方法有阻塞效果,直到有事件通知才会有返回
            selector.select();
            // 获取事件
            Set<SelectionKey> selectionKeys = selector.selectedKeys();
            // 遍历查询结果e
            Iterator<SelectionKey> iter = selectionKeys.iterator();
            while (iter.hasNext()) {
                // 被封装的查询结果
                SelectionKey key = iter.next();
                iter.remove();
                // 关注 Read 和 Accept两个事件
                if (key.isAcceptable()) {
                    ServerSocketChannel server = (ServerSocketChannel) key.attachment();
                    // 将拿到的客户端连接通道,注册到selector上面
                    SocketChannel clientSocketChannel = server.accept(); // mainReactor 轮询accept
                    clientSocketChannel.configureBlocking(false);
                    clientSocketChannel.register(selector, SelectionKey.OP_READ, clientSocketChannel);
                    System.out.println("收到新连接 : " + clientSocketChannel.getRemoteAddress());
                }

                if (key.isReadable()) {
                    SocketChannel socketChannel = (SocketChannel) key.attachment();
                    try {
                        ByteBuffer requestBuffer = ByteBuffer.allocate(1024);
                        while (socketChannel.isOpen() && socketChannel.read(requestBuffer) != -1) {
                            // 长连接情况下,需要手动判断数据有没有读取结束 (此处做一个简单的判断: 超过0字节就认为请求结束了)
                            if (requestBuffer.position() > 0) break;
                        }
                        if(requestBuffer.position() == 0) continue; // 如果没数据了, 则不继续后面的处理
                        requestBuffer.flip();
                        byte[] content = new byte[requestBuffer.limit()];
                        requestBuffer.get(content);
                        System.out.println(new String(content));
                        System.out.println("收到数据,来自:" + socketChannel.getRemoteAddress());
                        // TODO 业务操作 数据库 接口调用等等

                        // 响应结果 200
                        String response = "HTTP/1.1 200 OK\r\n" +
                                "Content-Length: 11\r\n\r\n" +
                                "Hello World";
                        ByteBuffer buffer = ByteBuffer.wrap(response.getBytes());
                        while (buffer.hasRemaining()) {
                            socketChannel.write(buffer);
                        }
                    } catch (IOException e) {
                        // e.printStackTrace();
                        key.cancel(); // 取消事件订阅
                    }
                }
            }
            selector.selectNow();
        }
        // 问题: 此处一个selector监听所有事件,一个线程处理所有请求事件. 会成为瓶颈! 要有多线程的运用
    }
}

服务器端第四次改进:在这次改进的思路中,我们使用到多路复用的 reactor 线程模型。所谓的多路复用,可以简单理解成既可以处理连接请求,也可以处理数据处理,思路中的另外一点就是用两个线程组(每个线程组里面可能有一个或多个线程)来分别处理连接请求和数据读取。下面继承自 Thread 类的抽象类 ReactorThread,在重新 run 方法的时候,就会同时处理注册在当前线程的 selector 对象上的 OP_READ 和 OP_ACCEPT 事件,同时 ReactorThread 中又提供了一个 handler 抽象方法,这个可以在创建 ReactorThread 实例的时候再实现。为什么要提到 handler 方法呢?因为 run 方法中在收到自己感兴趣的事件的时候,会调用 handler 方法,那么同样是 ReactorThread 的实例,就可以做到不同的线程组做不同的事。

执行过程是:首先准备好两个线程组分别处理不同的事,即调用 newGroup() 方法;接着初始化 mainReactor(也就是负责处理连接请求的线程)并注册感兴趣的事件 OP_ACCEPT;最后是绑定端口。而 mainReactor 的工作原理是收到一个请求后,就分配一个 workReactor(工作线程)去处理,并将 OP_READ 事件注册到工作线程的 selector 上面。

/**
 * NIO selector 多路复用reactor线程模型
 */
public class NIOServerV3 {
    /** 处理业务操作的线程 */
    private static ExecutorService workPool = Executors.newCachedThreadPool();

    /**
     * 封装了selector.select()等事件轮询的代码
     */
    abstract class ReactorThread extends Thread {

        Selector selector;
        LinkedBlockingQueue<Runnable> taskQueue = new LinkedBlockingQueue<>();

        /**
         * Selector监听到有事件后,调用这个方法
         */
        public abstract void handler(SelectableChannel channel) throws Exception;

        private ReactorThread() throws IOException {
            selector = Selector.open();
        }

        volatile boolean running = false;

        @Override
        public void run() {
            // 轮询Selector事件
            while (running) {
                try {
                    // 执行队列中的任务
                    Runnable task;
                    while ((task = taskQueue.poll()) != null) {
                        task.run();
                    }
                    selector.select(1000);

                    // 获取查询结果
                    Set<SelectionKey> selected = selector.selectedKeys();
                    // 遍历查询结果
                    Iterator<SelectionKey> iter = selected.iterator();
                    while (iter.hasNext()) {
                        // 被封装的查询结果
                        SelectionKey key = iter.next();
                        iter.remove();
                        int readyOps = key.readyOps();
                        // 关注 Read 和 Accept两个事件
                        if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
                            try {
                                SelectableChannel channel = (SelectableChannel) key.attachment();
                                channel.configureBlocking(false);
                                handler(channel);
                                if (!channel.isOpen()) {
                                    key.cancel(); // 如果关闭了,就取消这个KEY的订阅
                                }
                            } catch (Exception ex) {
                                key.cancel(); // 如果有异常,就取消这个KEY的订阅
                            }
                        }
                    }
                    selector.selectNow();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }

        private SelectionKey register(SelectableChannel channel) throws Exception {
            // 为什么register要以任务提交的形式,让reactor线程去处理?
            // 因为线程在执行channel注册到selector的过程中,会和调用selector.select()方法的线程争用同一把锁
            // 而select()方法实在eventLoop中通过while循环调用的,争抢的可能性很高,为了让register能更快的执行,
           // 就将注册的过程放入到任务队列中,当 reactor 的while开始跑之后,首先就是执行任务队列中的任务
            FutureTask<SelectionKey> futureTask = new FutureTask<>(() -> channel.register(selector, 0, channel));
            taskQueue.add(futureTask);
            return futureTask.get();
        }

        private void doStart() {
            if (!running) {
                running = true;
                start();
            }
        }
    }

    private ServerSocketChannel serverSocketChannel;
    // 1、创建多个线程 - accept处理reactor线程 (accept线程)
    private ReactorThread[] mainReactorThreads = new ReactorThread[1];
    // 2、创建多个线程 - io处理reactor线程  (I/O线程)
    private ReactorThread[] subReactorThreads = new ReactorThread[8];

    /**
     * 初始化线程组
     */
    private void newGroup() throws IOException {
        // 创建IO线程,负责处理客户端连接以后socketChannel的IO读写
        for (int i = 0; i < subReactorThreads.length; i++) {
            subReactorThreads[i] = new ReactorThread() {
                @Override
                public void handler(SelectableChannel channel) throws IOException {
                    // work线程只负责处理IO处理,不处理accept事件
                    SocketChannel ch = (SocketChannel) channel;
                    ByteBuffer requestBuffer = ByteBuffer.allocate(1024);
                    while (ch.isOpen() && ch.read(requestBuffer) != -1) {
                        // 长连接情况下,需要手动判断数据有没有读取结束 (此处做一个简单的判断: 超过0字节就认为请求结束了)
                        if (requestBuffer.position() > 0) break;
                    }
                    if (requestBuffer.position() == 0) return; // 如果没数据了, 则不继续后面的处理
                    requestBuffer.flip();
                    byte[] content = new byte[requestBuffer.limit()];
                    requestBuffer.get(content);
                    System.out.println(new String(content));
                    System.out.println(Thread.currentThread().getName() + "收到数据,来自:" + ch.getRemoteAddress());

                    // TODO 业务操作 数据库、接口...
                    workPool.submit(() -> {
                    });

                    // 响应结果 200
                    String response = "HTTP/1.1 200 OK\r\n" +
                            "Content-Length: 11\r\n\r\n" +
                            "Hello World";
                    ByteBuffer buffer = ByteBuffer.wrap(response.getBytes());
                    while (buffer.hasRemaining()) {
                        ch.write(buffer);
                    }
                }
            };
        }

        // 创建mainReactor线程, 只负责处理serverSocketChannel
        for (int i = 0; i < mainReactorThreads.length; i++) {
            mainReactorThreads[i] = new ReactorThread() {
                AtomicInteger incr = new AtomicInteger(0);

                @Override
                public void handler(SelectableChannel channel) throws Exception {
                    // 只做请求分发,不做具体的数据读取
                    ServerSocketChannel ch = (ServerSocketChannel) channel;
                    SocketChannel socketChannel = ch.accept();
                    socketChannel.configureBlocking(false);
                    // 收到连接建立的通知之后,分发给I/O线程继续去读取数据
                    int index = incr.getAndIncrement() % subReactorThreads.length;
                    ReactorThread workEventLoop = subReactorThreads[index];
                    workEventLoop.doStart();
                    SelectionKey selectionKey = workEventLoop.register(socketChannel);
                    selectionKey.interestOps(SelectionKey.OP_READ);
                    System.out.println(Thread.currentThread().getName() + "收到新连接 : " + socketChannel.getRemoteAddress());
                }
            };
        }


    }

    /**
     * 初始化channel,并且绑定一个eventLoop线程
     *
     * @throws IOException IO异常
     */
    private void initAndRegister() throws Exception {
        // 1、 创建ServerSocketChannel
        serverSocketChannel = ServerSocketChannel.open();
        serverSocketChannel.configureBlocking(false);
        // 2、 将serverSocketChannel注册到selector
        int index = new Random().nextInt(mainReactorThreads.length);
        mainReactorThreads[index].doStart();
        SelectionKey selectionKey = mainReactorThreads[index].register(serverSocketChannel);
        selectionKey.interestOps(SelectionKey.OP_ACCEPT);
    }

    /**
     * 绑定端口
     *
     * @throws IOException IO异常
     */
    private void bind() throws IOException {
        //  1、 正式绑定端口,对外服务
        serverSocketChannel.bind(new InetSocketAddress(8080));
        System.out.println("启动完成,端口8080");
    }

    public static void main(String[] args) throws Exception {
        NIOServerV3 nioServerV3 = new NIOServerV3();
        nioServerV3.newGroup(); // 1、 创建main和sub两组线程
        nioServerV3.initAndRegister(); // 2、 创建serverSocketChannel,注册到mainReactor线程上的selector上
        nioServerV3.bind(); // 3、 为serverSocketChannel绑定端口
    }
}

至此,服务器端的改进过程也已经完成,此时的服务器就可以同时处理大量的连接请求了。

接下来我们来细说 NIO 中的三大核心组件。

 

(1)Channel

Java NIO的通道类似流,但又有些不同:

  • 既可以从通道中读取数据,又可以写数据到通道。但流的读写通常是单向的。
  • 通道可以异步地读写。
  • 通道中的数据总是要先读到一个Buffer,或者总是要从一个Buffer中写入。

我们对数据的读取和写入要通过Channel,它就像水管一样,是一个通道。通道不同于流的地方就是通道是双向的,可以用于读、写和同时读写操作。底层的操作系统的通道一般都是全双工的,所以全双工的Channel比流能更好的映射底层操作系统的API。

(2)Buffer

在NIO库中,所有数据都是用缓冲区处理的。在读取数据时,它是直接读到缓冲区中的;在写入数据时,也是写入到缓冲区中。任何时候访问NIO中的数据,都是通过缓冲区进行操作。 缓冲区实际上是一个数组,并提供了对数据结构化访问以及维护读写位置等信息。

Buffer中有3个重要的参数:位置(position)、容量(capactiy)和上限(limit) 

下面举个例子来理解下这3个重要的参数:

public static void main(String[] args) throws Exception {
		ByteBuffer b = ByteBuffer.allocate(15); // 15个字节大小的缓冲区
		System.out.println("limit=" + b.limit() + " capacity=" + b.capacity()
				+ " position=" + b.position());
		for (int i = 0; i < 10; i++) {
			// 存入10个字节数据
			b.put((byte) i);
		}
		System.out.println("limit=" + b.limit() + " capacity=" + b.capacity()
				+ " position=" + b.position());
		b.flip(); // 重置position
		System.out.println("limit=" + b.limit() + " capacity=" + b.capacity()
				+ " position=" + b.position());
		for (int i = 0; i < 5; i++) {
			System.out.print(b.get());
		}
		System.out.println();
		System.out.println("limit=" + b.limit() + " capacity=" + b.capacity()
				+ " position=" + b.position());
		b.flip();
		System.out.println("limit=" + b.limit() + " capacity=" + b.capacity()
				+ " position=" + b.position());

	}

整个过程如图:

此时position从0到10,capactiy和limit不变。

该操作会重置position,通常,将buffer从写模式转换为读 模式时需要执行此方法 flip()操作不仅重置了当前的position为0,还将limit设置到当前position的位置 。

limit的意义在于,来确定哪些数据是有意义的,换句话说,从position到limit之间的数据才是有意义的数据,因为是上次操作的数据。所以flip操作往往是读写转换的意思。

(3)Selector

Selector提供选择已经就绪的任务的能力:Selector会不断轮询注册在其上的Channel,如果某个Channel上面发生读或者写事件,这个Channel就处于就绪状态,会被Selector轮询出来,然后通过SelectionKey可以获取就绪Channel的集合,进行后续的I/O操作,一个Selector可以同时轮询多个Channel,因为JDK使用了epoll()代替传统的select实现,所以没有最大连接句柄1024/2048的限制。所以,只需要一个线程负责Selector的轮询,就可以接入成千上万的客户端。

(4)API 说明

selector 的方法:

  • selectedKeys():选择就绪的 channel 
  • select():返回已经就绪的channel 数量

SelectionKey 的常用方法:

  • attchment():这个channel 携带的附件包
  • channel():获取这个 key 对应的 channel
  • cancel():取消它的就绪状态,避免被重复选择

Channel 的常用方法:

  • socket():获取 Socket 对象
  • configureBlocking():设置 channel 是否是阻塞式的,如果不是就设置为 false 即非阻塞式的
  • register():将 channel 注册到 selector 上面以供选择,可以使用重载方法添加附件包
  • read():从 buffer 中读取数据
  • write():往 buffer 中写数据
  • bind():绑定到哪个 IP 地址,端口
  • connect():连接到哪个 IP 地址、端口

Buffer 的常用方法:

  • array():将 buffer 中数据转换成字节数组
  • get():从 buffer 中获取数据
  • flip():重置 buffer 中的指针使其指向开头
  • put():往 buffer 中放置数据

 

3.AIO:异步 IO

NIO 2.0引入了新的异步通道的概念,并提供了异步文件通道和异步套接字通道的实现。

AIO的特点:

  • 读完了再通知我 
  • 不会加快IO,只是在读完后进行通知
  • 使用回调函数,进行业务处理 

异步的套接字通道是真正的异步非阻塞I/O,对应于UNIX网络编程中的事件驱动I/O(AIO)。他不需要过多的Selector对注册的通道进行轮询而是使用通知回调实现异步读写,从而简化了NIO的编程模型。

由于 NIO 的读写过程依然在应用线程里完成,所以对于那些读写时间长的,NIO 就不太适合

而 AIO 的读写过程完成后才会通知回调函数,所以 AIO 可以胜任那些重量级,读写时间长的任务。

使用最新提供的异步套接字通道 AsynchronousServerSocketChannel 和 AsynchronousSocketChannel,过程如下:

(1)创建 AsynchronousServerSocketChannel 实例,并且绑定端口号,开始接收用户请求

AsynchronousServerSocketChannel serverChannel=AsynchronousServerSocketChannel.open();

serverChannel.bind(port);

serverChannel.accept(this,new AcceptHandler());

(2)接收到用户请求后读取位于 buffer 中的数据

public class AcceptHandler implements CompletionHandler<V,A>{



    @override

    public void completed(V v,A a){

        //继续接收用户的请求

        serverChannel.accept(...);

        //读取 buffer 中的数据

        v.read(buffer,buffer,new ReadHandler(v));

    }



    @override

    public void failed(V v,A a){

    }

}

 

(3)读取到数据处理完之后发送数据给客户端

public class ReadHandler implements CompletionHandler<V,A>{



    public void completed(){

        //读取到数据之后进行处理并且写回客户端

        channel.write(writebuffer,writebuffer,new WriteHandler(channel));

    }



    public void failed(){



    }

}

(4)数据全部写回之后再开始读取 buffer 中的数据

public class WriteHandler implements CompletionHandler<V,A>{



    public void completed(){

        if(buffer.hasremaining()){

            channel.write(writebuffer,writebuffer,this);//继续写

        }else{

            //又开始读取 buffer 中的数据

            channel.read(readBuffer,readBuffer,new ReaderHandler(channel))

        }

    }



    public void failed(){



    }

}

(5)客户端先是创建 AsynchronousSocketChannel 对象通道,然后连接到服务器端

AsynchronousSocketChannel clientChannel=AsynchronousSocketChannel.open();

clientChannel.connect(new InetSocketAddress(host, port), this, this);

(6)连接上之后发送数据给服务器端

clientChannel.write(writeBuffer, writeBuffer, new ClientWriteHandler(clientChannel));

 

(7)将数据全部发送给服务器端后开始接收服务器端数据

public class ClientWriteHandler implements CompletionHandler<V,A>{



    public void completed(){

        if(buffer.hasremaining()){

            clientChannel.write(writebuffer,writebuffer,this);//继续写

        }else{

            //又开始读取 buffer 中的数据

            clientChannel.read(readBuffer,readBuffer,new ClientReaderHandler(channel))

        }

    }


    
    public void failed(){



    }
}

(8)读取服务器端返回的数据

public class ClientReadHandler implements CompletionHandler<V,A>{



    public void completed(){



    }



    public void failed(){



    }

}