Java之IO,BIO,NIO,AIO

参考文献一

IO基础知识回顾

java的核心库java.io提供了全面的IO接口。包括:文件读写、标准设备输出等。Java中IO是以流为基础进行输入输出的,全部数据被串行化写入输出流,或者从输入流读入。java

java.nio(java non-blocking IO),nio 是non-blocking的简称,是jdk1.4 及以上版本里提供的新api(New IO) ,为全部的原始类型(boolean类型除外)提供缓存支持。Sun 官方标榜的特性以下: 为全部的原始类型提供(Buffer)缓存支持。字符集编码解码解决方案。 Channel :一个新的原始I/O 抽象。 支持锁和内存映射文件的文件访问接口。 提供多路(non-bloking) 非阻塞式的高伸缩性网络I/O 。react

IO流类图结构express

这里写图片描述

IO流简单例子编程

实例一:后端

FileInputStream fis=null;
FileOutputStream fos=null;
try {
    fis = new FileInputStream(new File("D:\\a.txt"));
    fos = new FileOutputStream(new File("D:\\y.txt"));
    int ch;
    while((ch=fis.read()) != -1){
        System.out.println((char)ch);
        fos.write(ch);
    }
} catch (FileNotFoundException e) {
    e.printStackTrace();
} catch (IOException e) {
    e.printStackTrace();
}finally {
    if(null != fos){
        fos.close();
    }
    if(null != fis){
        fis.close();
    }
}

这里写图片描述

实例二:字节流转换成字符流api

public static void main(String[] args) throws Exception{
        BufferedReader br = null;
        BufferedWriter bw = null;
        try {
            br = new BufferedReader(new InputStreamReader(new FileInputStream("D:\\a.txt")));
            bw = new BufferedWriter(new OutputStreamWriter(new FileOutputStream("D:\\y.txt")));
            String s;
            StringBuilder sb = new StringBuilder();
            while((s=br.readLine())!=null){
                System.out.println(s);
                bw.write(s);
            }
        } catch (FileNotFoundException e) {
            e.printStackTrace();
        } catch (IOException e) {
            e.printStackTrace();
        }finally {
            if(null != bw){
                bw.close();
            }
            if(null != br){
                br.close();
            }
        }
    }

实例三:用转换流从控制台上读入数据数组

public static void main(String[] args) throws Exception{
        BufferedReader br = null;
        try {
            br = new BufferedReader(new InputStreamReader(System.in));
            String s=br.readLine();
            System.out.println(s);
        } catch (IOException e) {
            e.printStackTrace();
        }finally {
            if(null != br){
                br.close();
            }
        }
    }

BIO编程

传统BIO通讯模型图缓存

​ 传统的同步阻塞模型开发中,ServerSocket负责绑定IP地址,启动监听端口;Socket负责发起链接操做。链接成功后,双方经过输入和输出流进行同步阻塞式通讯。 服务端提供IP和监听端口,客户端经过链接操做想服务端监听的地址发起链接请求,经过三次握手链接,若是链接成功创建,双方就能够经过套接字进行通讯。服务器

​ 简单的描述一下BIO的服务端通讯模型:采用BIO通讯模型的服务端,一般由一个独立的Acceptor线程负责监听客户端的链接,它接收到客户端链接请求以后为每一个客户端建立一个新的线程进行链路处理没处理完成后,经过输出流返回应答给客户端,线程销毁。即典型的一请求一应答通宵模型。网络

​ 传统BIO通讯模型图:

​ 01

​ 该模型最大的问题就是缺少弹性伸缩能力,当客户端并发访问量增长后,服务端的线程个数和客户端并发访问数呈1:1的正比关系Java中的线程也是比较宝贵的系统资源,线程数量快速膨胀后,系统的性能将急剧降低,随着访问量的继续增大,系统最终就死-掉-了

传统BIO编程实例

传统的同步阻塞模型开发中,ServerSocket负责绑定IP地址,启动监听端口;Socket负责发起链接操做。链接成功后,双方经过输入和输出流进行同步阻塞式通讯。

package com.evada.de;
import java.io.*;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.Random;

/**
 * 描述:传统BIO编程实例
 * @author Ay
 * @date 2017/6/27
 */
public final class AyTest extends BaseTest {

    public static void main(String[] args) throws InterruptedException {
        //启动线程,运行服务器
        new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    ServerBetter.start();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }).start();

        //避免客户端先于服务器启动前执行代码
        Thread.sleep(100);

        //启动线程,运行客户端
        char operators[] = {'+', '-', '*', '/'};
        Random random = new Random(System.currentTimeMillis());
        new Thread(new Runnable() {
            @SuppressWarnings("static-access")
            @Override
            public void run() {
                while (true) {
                    //随机产生算术表达式
                    String expression = random.nextInt(10) + "" + operators[random.nextInt(4)] + (random.nextInt(10) + 1);
                    Client.send(expression);
                    try {
                        Thread.currentThread().sleep(random.nextInt(1000));
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        }).start();
    }

}

class ServerBetter{

    //默认的端口号
    private static int DEFAULT_PORT = 12345;
    //单例的ServerSocket
    private static ServerSocket server;

    //根据传入参数设置监听端口,若是没有参数调用如下方法并使用默认值
    public static void start() throws IOException {
        //使用默认值端口
        start(DEFAULT_PORT);
    }
    //这个方法不会被大量并发访问,不太须要考虑效率,直接进行方法同步就好了
    public synchronized static void start(int port) throws IOException{
        if(server != null) return;
        try{
            //经过构造函数建立ServerSocket,若是端口合法且空闲,服务端就监听成功
            server = new ServerSocket(port);
            System.out.println("服务器已启动,端口号:" + port);
            //经过无线循环监听客户端链接,若是没有客户端接入,将阻塞在accept操做上。
            while(true){
                Socket socket = server.accept();
                //当有新的客户端接入时,会执行下面的代码
                //而后建立一个新的线程处理这条Socket链路
                new Thread(new ServerHandler(socket)).start();
            }
        }finally{
            //一些必要的清理工做
            if(server != null){
                System.out.println("服务器已关闭。");
                server.close();
                server = null;
            }
        }
    }

}


class ServerHandler implements Runnable{
    private Socket socket;
    public ServerHandler(Socket socket) {
        this.socket = socket;
    }
    @Override
    public void run() {
        BufferedReader in = null;
        PrintWriter out = null;
        try{
            in = new BufferedReader(new InputStreamReader(socket.getInputStream()));
            out = new PrintWriter(socket.getOutputStream(),true);
            String expression;
            String result;
            while(true){
                //经过BufferedReader读取一行
                //若是已经读到输入流尾部,返回null,退出循环
                //若是获得非空值,就尝试计算结果并返回
                if((expression = in.readLine())==null) break;
                System.out.println("服务器收到消息:" + expression);
                try{
                    result = "123";//Calculator.cal(expression).toString();
                }catch(Exception e){
                    result = "计算错误:" + e.getMessage();
                }
                out.println(result);
            }
        }catch(Exception e){
            e.printStackTrace();
        }finally{
            //一些必要的清理工做
            if(in != null){
                try {
                    in.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
                in = null;
            }
            if(out != null){
                out.close();
                out = null;
            }
            if(socket != null){
                try {
                    socket.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
                socket = null;
            }
        }
    }
}

class Client {
    //默认的端口号
    private static int DEFAULT_SERVER_PORT = 12345;
    //默认服务器Ip
    private static String DEFAULT_SERVER_IP = "127.0.0.1";

    public static void send(String expression){
        send(DEFAULT_SERVER_PORT,expression);
    }
    public static void send(int port,String expression){
        System.out.println("算术表达式为:" + expression);
        Socket socket = null;
        BufferedReader in = null;
        PrintWriter out = null;
        try{
            socket = new Socket(DEFAULT_SERVER_IP,port);
            in = new BufferedReader(new InputStreamReader(socket.getInputStream()));
            out = new PrintWriter(socket.getOutputStream(),true);
            out.println(expression);
            System.out.println("___结果为:" + in.readLine());
        }catch(Exception e){
            e.printStackTrace();
        }finally{
            //一下必要的清理工做
            if(in != null){
                try {
                    in.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
                in = null;
            }
            if(out != null){
                out.close();
                out = null;
            }
            if(socket != null){
                try {
                    socket.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
                socket = null;
            }
        }
    }
}

伪异步I/O编程

咱们可使用线程池来管理这些线程(须要了解更多请参考前面提供的文章),实现1个或多个线程处理N个客户端的模型(可是底层仍是使用的同步阻塞I/O),一般被称为“伪异步I/O模型“

伪异步I/O编程模型图

这里写图片描述

测试运行结果是同样的。

​ 咱们知道,若是使用CachedThreadPool线程池(不限制线程数量,若是不清楚请参考文首提供的文章),其实除了能自动帮咱们管理线程(复用),看起来也就像是1:1的客户端:线程数模型,而使用FixedThreadPool咱们就有效的控制了线程的最大数量,保证了系统有限的资源的控制,实现了N:M的伪异步I/O模型。

​ 可是,正由于限制了线程数量,若是发生大量并发请求,超过最大数量的线程就只能等待,直到线程池中的有空闲的线程能够被复用。而对Socket的输入流就行读取时,会一直阻塞,直到发生:

  • ​ 有数据可读
  • ​ 可用数据以及读取完毕
  • ​ 发生空指针或I/O异常

​ 因此在读取数据较慢时(好比数据量大、网络传输慢等),大量并发的状况下,其余接入的消息,只能一直等待,这就是最大的弊端。

​ 然后面即将介绍的NIO,就能解决这个难题。

伪异步IO编程代码

package com.anxpp.io.calculator.bio;  
import java.io.IOException;  
import java.net.ServerSocket;  
import java.net.Socket;  
import java.util.concurrent.ExecutorService;  
import java.util.concurrent.Executors;  
/** 
 * BIO服务端源码__伪异步I/O 
 * @author yangtao__anxpp.com 
 * @version 1.0 
 */  
public final class ServerBetter {  
    //默认的端口号  
    private static int DEFAULT_PORT = 12345;  
    //单例的ServerSocket  
    private static ServerSocket server;  
    //线程池 懒汉式的单例  
    private static ExecutorService executorService = Executors.newFixedThreadPool(60);  
    //根据传入参数设置监听端口,若是没有参数调用如下方法并使用默认值  
    public static void start() throws IOException{  
        //使用默认值  
        start(DEFAULT_PORT);  
    }  
    //这个方法不会被大量并发访问,不太须要考虑效率,直接进行方法同步就好了  
    public synchronized static void start(int port) throws IOException{  
        if(server != null) return;  
        try{  
            //经过构造函数建立ServerSocket  
            //若是端口合法且空闲,服务端就监听成功  
            server = new ServerSocket(port);  
            System.out.println("服务器已启动,端口号:" + port);  
            //经过无线循环监听客户端链接  
            //若是没有客户端接入,将阻塞在accept操做上。  
            while(true){  
                Socket socket = server.accept();  
                //当有新的客户端接入时,会执行下面的代码  
                //而后建立一个新的线程处理这条Socket链路  
                executorService.execute(new ServerHandler(socket));  
            }  
        }finally{  
            //一些必要的清理工做  
            if(server != null){  
                System.out.println("服务器已关闭。");  
                server.close();  
                server = null;  
            }  
        }  
    }  
}
  • 1,同步和异步是针对应用程序和内核的交互而言的。
  • 2,阻塞和非阻塞是针对于进程在访问数据的时候,根据IO操做的就绪状态来采起的不一样方式,说白了是一种读取或者写入操做函数的实现方式,阻塞方式下读取或者写入函数将一直等待,而非阻塞方式下,读取或者写入函数会当即返回一个状态值。

由上描述基本能够总结一句简短的话,同步和异步是目的,阻塞和非阻塞是实现方式

同步阻塞: 
在此种方式下,用户进程在发起一个IO操做之后,必须等待IO操做的完成,只有当真正完成了IO操做之后,用户进程才能运行。JAVA传统的IO模型属于此种方式。

同步非阻塞: 
在此种方式下,用户进程发起一个IO操做之后边可返回作其它事情,可是用户进程须要时不时的询问IO操做是否就绪,这就要求用户进程不停的去询问,从而引入没必要要的CPU资源浪费。其中目前JAVA的NIO就属于同步非阻塞IO。 
异步: 
此种方式下是指应用发起一个IO操做之后,不等待内核IO操做的完成,等内核完成IO操做之后会通知应用程序。

若是你想吃一份宫保鸡丁盖饭:

同步阻塞:你到饭馆点餐,而后在那等着,还要一边喊:好了没啊!

同步非阻塞:在饭馆点完餐,就去遛狗了。不过溜一下子,就回饭馆喊一声:好了没啊!

异步阻塞:遛狗的时候,接到饭馆电话,说饭作好了,让您亲自去拿。

异步非阻塞:饭馆打电话说,咱们知道您的位置,一会给你送过来,安心遛狗就能够了。

NIO 编程

简介

Java NIO(New IO)是一个能够替代标准Java IO API(从Java 1.4开始),Java NIO提供了与标准IO不一样的IO工做方式。

Java NIO 由如下几个核心部分组成:

  • Channels
  • Buffers
  • Selectors

虽然Java NIO 中除此以外还有不少类和组件,但Channel,Buffer 和 Selector 构成了核心的API。其它组件,如Pipe和FileLock,只不过是与三个核心组件共同使用的工具类。所以,我将集中精力在这三个组件上。其它组件会在单独的章节中讲到。

注意(每一个线程的处理流程大概都是读取数据、解码、计算处理、编码、发送响应)

很是形象的实例

小量的线程如何同时为大量链接服务呢,答案就是就绪选择。这就比如到餐厅吃饭,每来一桌客人,都有一个服务员专门为你服务,从你到餐厅到结账走人,这样方式的好处是服务质量好,一对一的服务,VIP啊,但是缺点也很明显,成本高,若是餐厅生意好,同时来100桌客人,就须要100个服务员,那老板发工资的时候得心痛死了,这就是传统的一个链接一个线程的方式。

老板是什么人啊,精着呢。这老板就得捉摸怎么能用10个服务员同时为100桌客人服务呢,老板就发现,服务员在为客人服务的过程当中并非一直都忙着,客人点完菜,上完菜,吃着的这段时间,服务员就闲下来了,但是这个服务员仍是被这桌客人占用着,不能为别的客人服务,用华为领导的话说,就是工做不饱满。那怎么把这段闲着的时间利用起来呢。这餐厅老板就想了一个办法,让一个服务员(前台)专门负责收集客人的需求,登记下来,好比有客人进来了、客人点菜了,客人要结账了,都先记录下来按顺序排好。每一个服务员到这里领一个需求,好比点菜,就拿着菜单帮客人点菜去了。点好菜之后,服务员立刻回来,领取下一个需求,继续为别人客人服务去了。这种方式服务质量就不如一对一的服务了,当客人数据不少的时候可能须要等待。但好处也很明显,因为在客人正吃饭着的时候服务员不用闲着了,服务员这个时间内能够为其余客人服务了,原来10个服务员最多同时为10桌客人服务,如今可能为50桌,10客人服务了。

这种服务方式跟传统的区别有两个:

一、增长了一个角色,要有一个专门负责收集客人需求的人。NIO里对应的就是Selector。

二、由阻塞服务方式改成非阻塞服务了,客人吃着的时候服务员不用一直侯在客人旁边了。传统的IO操做,好比read(),当没有数据可读的时候,线程一直阻塞被占用,直到数据到来。NIO中没有数据可读时,read()会当即返回0,线程不会阻塞。

NIO中,客户端建立一个链接后,先要将链接注册到Selector,至关于客人进入餐厅后,告诉前台你要用餐,前台会告诉你你的桌号是几号,而后你就可能到那张桌子坐下了,SelectionKey就是桌号。当某一桌须要服务时,前台就记录哪一桌须要什么服务,好比1号桌要点菜,2号桌要结账,服务员从前台取一条记录,根据记录提供服务,完了再来取下一条。这样服务的时间就被最有效的利用起来了。

工做原理

这里写图片描述

Java NIO和IO的主要区别

IO NIO
Stream oriented Buffer oriented
Blocking IO Non blocking IO
  Selectors

面向流与面向缓冲

Java NIO和IO之间第一个最大的区别是,IO是面向流的,NIO是面向缓冲区的

Java IO面向流意味着每次从流中读一个或多个字节,直至读取全部字节,它们没有被缓存在任何地方。此外,它不能先后移动流中的数据。若是须要先后移动从流中读取的数据,须要先将它缓存到一个缓冲区。 Java NIO的缓冲导向方法略有不一样。数据读取到一个它稍后处理的缓冲区,须要时可在缓冲区中先后移动。这就增长了处理过程当中的灵活性。可是,还须要检查是否该缓冲区中包含全部您须要处理的数据。并且,需确保当更多的数据读入缓冲区时,不要覆盖缓冲区里还没有处理的数据。

阻塞与非阻塞IO

Java IO的各类流是阻塞的。这意味着,当一个线程调用read() 或 write()时,该线程被阻塞,直到有一些数据被读取,或数据彻底写入。该线程在此期间不能再干任何事情了。 Java NIO的非阻塞模式,使一个线程从某通道发送请求读取数据,可是它仅能获得目前可用的数据,若是目前没有数据可用时,就什么都不会获取。而不是保持线程阻塞,因此直至数据变的能够读取以前,该线程能够继续作其余的事情。 非阻塞写也是如此。一个线程请求写入一些数据到某通道,但不须要等待它彻底写入,这个线程同时能够去作别的事情。 线程一般将非阻塞IO的空闲时间用于在其它通道上执行IO操做,因此一个单独的线程如今能够管理多个输入和输出通道(channel)。

NIO和IO如何影响应用程序的设计

不管您选择IO或NIO工具箱,可能会影响您应用程序设计的如下几个方面:

  • 对NIO或IO类的API调用。
  • 数据处理。
  • 用来处理数据的线程数。

通道 Channel

简介

Channel 是对数据的源头和数据目标点流经途径的抽象,在这个意义上和 InputStream 和 OutputStream 相似。Channel能够译为“通道、管 道”,而传输中的数据仿佛就像是在其中流淌的水。前面也提到了Buffer,Buffer和Channel相互配合使用,才是Java的NIO。

Java NIO的通道与流区别

  • 既能够从通道中读取数据,又能够写数据到通道。但流的读写一般是单向的。

  • 通道能够异步地读写。

  • 通道中的数据老是要先读到一个Buffer,或者老是要从一个Buffer中写入。

咱们对数据的读取和写入要经过Channel,它就像水管同样,是一个通道。通道不一样于流的地方就是通道是双向的,能够用于读、写和同时读写操做。 数据能够从Channel读到Buffer中,也能够从Buffer 写到Channel中。

这里写图片描述

注意:通道必须结合Buffer使用,不能直接向通道中读/写数据

Channel主要分类

广义上来讲通道能够被分为两类:File I/O和Stream I/O,也就是文件通道和套接字通道。若是分的更细致一点则是:

  • FileChannel 从文件读写数据
  • SocketChannel 经过TCP读写网络数据
  • ServerSocketChannel 能够监听新进来的TCP链接,并对每一个连接建立对应的SocketChannel
  • DatagramChannel 经过UDP读写网络中的数据
  • Pipe

Channel的实现

这些是Java NIO中最重要的通道的实现:

  • FileChannel:从文件中读写数据。
  • DatagramChannel:能经过UDP读写网络中的数据。
  • SocketChannel:能经过TCP读写网络中的数据。
  • ServerSocketChannel:能够监听新进来的TCP链接,像Web服务器那样。对每个新进来的链接都会建立一个SocketChannel。

打开FileChannel

在使用FileChannel以前,必须先打开它。可是,咱们没法直接打开一个FileChannel,须要经过使用一个InputStream、OutputStream或RandomAccessFile来获取一个FileChannel实例。下面是经过RandomAccessFile打开FileChannel的示例:

RandomAccessFile aFile = new RandomAccessFile("d://nio-data.txt", "rw");
FileChannel inChannel = aFile.getChannel();

从FileChannel读取数据

调用多个read()方法之一从FileChannel中读取数据。如:

ByteBuffer buf = ByteBuffer.allocate(48);
int bytesRead = inChannel.read(buf);

首先,分配一个Buffer。从FileChannel中读取的数据将被读到Buffer中。

而后,调用FileChannel.read()方法。该方法将数据从FileChannel读取到Buffer中。read()方法返回的int值表示了有多少字节被读到了Buffer中。若是返回-1,表示到了文件末尾。

向FileChannel写数据

使用FileChannel.write()方法向FileChannel写数据,该方法的参数是一个Buffer。如:

String newData = "New String to write to file..." + System.currentTimeMillis();
ByteBuffer buf = ByteBuffer.allocate(48);
buf.clear();
buf.put(newData.getBytes());
buf.flip();
while(buf.hasRemaining()) {
    channel.write(buf);
}

注意FileChannel.write()是在while循环中调用的。由于没法保证write()方法一次能向FileChannel写入多少字节,所以须要重复调用write()方法,直到Buffer中已经没有还没有写入通道的字节。

关闭FileChannel

用完FileChannel后必须将其关闭。如:

channel.close();

FileChannel的position方法

有时可能须要在FileChannel的某个特定位置进行数据的读/写操做。能够经过调用position()方法获取FileChannel的当前位置。

也能够经过调用position(long pos)方法设置FileChannel的当前位置。

这里有两个例子:

long pos = channel.position();
channel.position(pos +123);

若是将位置设置在文件结束符以后,而后试图从文件通道中读取数据,读方法将返回-1 —— 文件结束标志。

若是将位置设置在文件结束符以后,而后向通道中写数据,文件将撑大到当前位置并写入数据。这可能致使“文件空洞”,磁盘上物理文件中写入的数据间有空隙。

FileChannel的size方法

FileChannel实例的size()方法将返回该实例所关联文件的大小。如:

long fileSize = channel.size();

FileChannel的truncate方法

可使用FileChannel.truncate()方法截取一个文件。截取文件时,文件将中指定长度后面的部分将被删除。如:

channel.truncate(1024);

这个例子截取文件的前1024个字节。

FileChannel的force方法

FileChannel.force()方法将通道里还没有写入磁盘的数据强制写到磁盘上。出于性能方面的考虑,操做系统会将数据缓存在内存中,因此没法保证写入到FileChannel里的数据必定会即时写到磁盘上。要保证这一点,须要调用force()方法。

force()方法有一个boolean类型的参数,指明是否同时将文件元数据(权限信息等)写到磁盘上。

下面的例子同时将文件数据和元数据强制写到磁盘上:

channel.force(true);

transferFrom()

FileChannel没法设置为非阻塞模式,它老是运行在阻塞模式下。

FileChannel的transferFrom()方法能够将数据从源通道传输到FileChannel中(译者注:这个方法在JDK文档中的解释为将字节从给定的可读取字节通道传输到此通道的文件中)。下面是一个简单的例子:

//在使用FileChannel以前,必须先打开它。可是,咱们没法直接打开一个FileChannel,
//须要经过使用一个InputStream、OutputStream或RandomAccessFile来获取一个FileChannel实例。
RandomAccessFile fromFile = new RandomAccessFile("fromFile.txt", "rw");  
FileChannel      fromChannel = fromFile.getChannel();  
RandomAccessFile toFile = new RandomAccessFile("toFile.txt", "rw");  
FileChannel      toChannel = toFile.getChannel();   
long position = 0;  
long count = fromChannel.size();  
toChannel.transferFrom(position, count, fromChannel);

transferFrom 方法的输入参数 position 表示从 position 处开始向目标文件写入数据,count 表示最多传输的字节数。若是源通道的剩余空间小于 count 个字节,则所传输的字节数要小于请求的字节数。

此外要注意,在 SoketChannel 的实现中,SocketChannel 只会传输此刻准备好的数据(可能不足count字节)。所以,SocketChannel 可能不会将请求的全部数据(count个字节)所有传输到 FileChannel 中。

transferTo()

transferTo()方法将数据从FileChannel传输到其余的channel中。下面是一个简单的例子:

RandomAccessFile fromFile = new RandomAccessFile("fromFile.txt", "rw");  
FileChannel      fromChannel = fromFile.getChannel();    
RandomAccessFile toFile = new RandomAccessFile("toFile.txt", "rw");  
FileChannel      toChannel = toFile.getChannel();  
long position = 0;  
long count = fromChannel.size();  
fromChannel.transferTo(position, count, toChannel);

是否是发现这个例子和前面那个例子特别类似?除了调用方法的FileChannel对象不同外,其余的都同样。

上面所说的关于SocketChannel的问题在transferTo()方法中一样存在。SocketChannel会一直传输数据直到目标buffer被填满。

Channel简单实例

下面是Channel的一个简单的实例:

程序清单 1-1
RandomAccessFile aFile = new RandomAccessFile("d:\\ay.txt", "rw");
FileChannel fileChannel = aFile.getChannel();
//分配缓存区大小
ByteBuffer buf = ByteBuffer.allocate(48);
int bytesRead = fileChannel.read(buf);
while (bytesRead != -1) {
    System.out.println("Read " + bytesRead);
    //buf.flip()的调用,首先读取数据到Buffer,而后反转Buffer,接着再从Buffer中读取数据(注:flip:空翻,反转)
    buf.flip();
    //判断是否有剩余(注:Remaining:剩余的)
    while(buf.hasRemaining()){
        System.out.print((char) buf.get());
    }
    buf.clear();
    bytesRead = fileChannel.read(buf);
}
aFile.close();

缓冲区 Buffer

缓冲区本质上是一块能够写入数据,而后能够从中读取数据的内存。这块内存被包装成NIO Buffer对象,并提供了一组方法,用来方便的访问该块内存。

Buffer的基本用法

使用Buffer读写数据通常遵循如下四个步骤:

  • 写入数据到Buffer
  • 调用flip()方法
  • 从Buffer中读取数据
  • 调用clear()方法或者compact()方法

当向buffer写入数据时,buffer会记录下写了多少数据。一旦要读取数据,须要经过 flip() 方法将 Buffer 从写模式切换到读模式。在读模式下,能够读取以前写入到buffer的全部数据。

一旦读完了全部的数据,就须要清空缓冲区,让它能够再次被写入。有两种方式能清空缓冲区:调用 clear() 或 compact() 方法。clear() 方法会清空整个缓冲区。compact() 方法只会清除已经读过的数据。任何未读的数据都被移到缓冲区的起始处,新写入的数据将放到缓冲区未读数据的后面。

程序清单 1-1
RandomAccessFile aFile = new RandomAccessFile("d:\\ay.txt", "rw");
FileChannel fileChannel = aFile.getChannel();
//分配缓存区大小
ByteBuffer buf = ByteBuffer.allocate(48);
int bytesRead = fileChannel.read(buf);
while (bytesRead != -1) {
    System.out.println("Read " + bytesRead);
    //buf.flip()的调用,首先读取数据到Buffer,而后反转Buffer,接着再从Buffer中读取数据(注:flip:空翻,反转)
    buf.flip();
    //判断是否有剩余(注:Remaining:剩余的)
    while(buf.hasRemaining()){
        System.out.print((char) buf.get());
    }
    buf.clear();
    bytesRead = fileChannel.read(buf);
}
aFile.close();

Buffer的三个属性

为了理解Buffer的工做原理,须要熟悉它的三个属性:

  • capacity:做为一个内存块,Buffer 有一个固定的大小值,也叫 “capacity”. 你只能往里写 capacity 个 byte、long,char 等类型。一旦 Buffer 满了,须要将其清空(经过读数据或者清除数据)才能继续写数据往里写数据。
  • position:当你写数据到Buffer中时,position表示当前的位置。初始的position值为0.当一个byte、long等数据写到Buffer后, position会向前移动到下一个可插入数据的 Buffer 单元。position 最大可为 capacity – 1。 当读取数据时,也是从某个特定位置读。当将 Buffer 从写模式切换到读模式,position会被重置为 0。当从Buffer的 position 处读取数据时,position 向前移动到下一个可读的位置。
  • limit:在写模式下,Buffer的limit表示你最多能往 Buffer 里写多少数据。 写模式下,limit 等于 Buffer 的 capacity 。 当切换Buffer到读模式时, limit 表示你最多能读到多少数据。所以,当切换Buffer到读模式时,limit 会被设置成写模式下的 position 值。换句话说,你能读到以前写入的全部数据(limit被设置成已写数据的数量,这个值在写模式下就是 position )。

这里写图片描述

Buffer的类型

Java NIO 有如下Buffer类型:

  • ByteBuffer
  • MappedByteBuffer
  • CharBuffer
  • DoubleBuffer
  • FloatBuffer
  • IntBuffer
  • LongBuffer
  • ShortBuffer

Buffer的分配

要想得到一个Buffer对象首先要进行分配。 每个Buffer类都有一个allocate方法。下面是一个分配48字节capacity的ByteBuffer的例子。

ByteBuffer buf = ByteBuffer.allocate(48);

这是分配一个可存储1024个字符的CharBuffer:

CharBuffer buf = CharBuffer.allocate(1024);

Buffer写数据

写数据到Buffer有两种方式:

  • 从Channel写到Buffer。
  • 经过Buffer的put()方法写到Buffer里。

从Channel写到Buffer,例如:

int bytesRead = inChannel.read(buf); //read into buffer

经过put方法写Buffer的例子:

buf.put(127);

put方法有不少版本,容许你以不一样的方式把数据写入到Buffer中。例如, 写到一个指定的位置,或者把一个字节数组写入到Buffer。 更多Buffer实现的细节参考JavaDoc。

flip()方法

flip方法将Buffer从写模式切换到读模式。调用flip()方法会将position设回0,并将limit设置成以前position的值。

换句话说,position如今用于标记读的位置,limit表示以前写进了多少个byte、char等 —— 如今能读取多少个byte、char等。

Buffer中读取数据

从Buffer中读取数据有两种方式:

  • 从Buffer读取数据到Channel。
  • 使用get()方法从Buffer中读取数据。

从Buffer读取数据到Channel的例子:

//read from buffer into channel.  
int bytesWritten = inChannel.write(buf);

使用get()方法从Buffer中读取数据的例子 :

byte aByte = buf.get();

get方法有不少版本,容许你以不一样的方式从Buffer中读取数据。例如,从指定position读取,或者从Buffer中读取数据到字节数组。

rewind()方法

Buffer.rewind()将 position 设回0,因此你能够重读Buffer中的全部数据。limit 保持不变,仍然表示能从Buffer中读取多少个元素(byte、char等)

clear()与compact()方法

一旦读完Buffer中的数据,须要让Buffer准备好再次被写入。能够经过clear()或compact()方法来完成。

若是调用的是 clear() 方法,position将被设回 0,limit被设置成 capacity 的值。换句话说,Buffer 被清空了。

若是Buffer中有一些未读的数据,调用clear()方法,数据将“被遗忘”,意味着再也不有任何标记会告诉你哪些数据被读过,哪些尚未。

若是Buffer中仍有未读的数据,且后续还须要这些数据,可是此时想要先先写些数据,那么使用compact()方法。

compact()方法将全部未读的数据拷贝到Buffer起始处。而后将position设到最后一个未读元素正后面。limit 属性依然像 clear() 方法同样,设置成 capacity。如今Buffer准备好写数据了,可是不会覆盖未读的数据。

mark()与reset()方法

经过调用Buffer.mark()方法,能够标记Buffer中的一个特定position。以后能够经过调用Buffer.reset()方法恢复到这个position。例如:

buffer.mark();  
//set position back to mark.  
buffer.reset();  
equals()与compareTo()方法

可使用equals()和compareTo()方法两个Buffer。

equals()

当知足下列条件时,表示两个Buffer相等:

  • 有相同的类型(byte、char、int等)。
  • Buffer中剩余的 byte、char 等的个数相等。
  • Buffer中全部剩余的byte、char等都相同。

如你所见,equals只是比较Buffer的一部分,不是每个在它里面的元素都比较。实际上,它只比较Buffer中的剩余元素。

compareTo()方法

compareTo()方法比较两个Buffer的剩余元素(byte、char等), 若是知足下列条件,则认为一个Buffer “小于” 另外一个Buffer:

  • 第一个不相等的元素小于另外一个Buffer中对应的元素。
  • 全部元素都相等,但第一个Buffer比另外一个先耗尽(第一个Buffer的元素个数比另外一个少)。

选择器( Selector)

简单介绍

Java NIO引入了选择器的概念,选择器用于监听多个通道的事件(好比:链接打开,数据到达)。Selector提供选择已经就绪的任务的能力:Selector会不断轮询注册在其上的Channel,若是某个Channel上面发生读或者写事件,这个Channel就处于就绪状态,会被Selector轮询出来,而后经过SelectionKey能够获取就绪Channel的集合,进行后续的I/O操做。

一个Selector能够同时轮询多个Channel,由于JDK使用了epoll()代替传统的select实现,因此没有最大链接句柄1024/2048的限制。因此,只须要一个线程负责Selector的轮询,就能够接入成千上万的客户端。

这里写图片描述

要使用Selector,得向 Selector 注册 Channel ,而后调用它的 select() 方法。这个方法会一直阻塞到某个注册的通道有事件就绪。一旦这个方法返回,线程就能够处理这些事件,事件的例子好比新链接进来,数据接收等。

Selector的建立

经过调用Selector.open()方法建立一个Selector,以下:

Selector selector = Selector.open();

Selector注册通道

为了将 Channel 和 Selector 配合使用,必须将 channel 注册到 selector 上。经过 SelectableChannel.register() 方法来实现,以下:

channel.configureBlocking(false);  
SelectionKey key = channel.register(selector,  Selectionkey.OP_READ);

与 Selector 一块儿使用时,Channel 必须处于非阻塞模式下。这意味着不能将 FileChannel 与 Selector 一块儿使用,由于 FileChannel 不能切换到非阻塞模式。而套接字通道均可以。

注意register()方法的第二个参数。这是一个“interest集合”,意思是在经过Selector监听Channel时对什么事件感兴趣。能够监听四种不一样类型的事件:

  • Connect
  • Accept
  • Read
  • Write

通道触发了一个事件意思是该事件已经就绪。因此,某个channel成功链接到另外一个服务器称为“链接就绪”。一个 server socket channel 准备好接收新进入的链接称为“接收就绪”。一个有数据可读的通道能够说是“读就绪”。等待写数据的通道能够说是“写就绪”。

这四种事件用 SelectionKey 的四个常量来表示:

  • SelectionKey.OP_CONNECT
  • SelectionKey.OP_ACCEPT
  • SelectionKey.OP_READ
  • SelectionKey.OP_WRITE

若是你对不止一种事件感兴趣,那么能够用 “ 位 或 ” 操做符将常量链接起来,以下:

int interestSet = SelectionKey.OP_READ | SelectionKey.OP_WRITE;

SelectionKey

在上一小节中,当向Selector注册Channel时,register() 方法会返回一个SelectionKey对象。这个对象包含了一些你感兴趣的属性:

  • interest集合
  • ready集合
  • Channel
  • Selector
  • 附加的对象(可选)

下面我会描述这些属性。

interest集合

就像向Selector注册通道一节中所描述的,interest集合是你所选择的感兴趣的事件集合。能够经过 SelectionKey 读写 interest 集合,像这样:

int interestSet = selectionKey.interestOps();
boolean isInterestedInAccept  = (interestSet & SelectionKey.OP_ACCEPT) == SelectionKey.OP_ACCEPT;
boolean isInterestedInConnect = interestSet & SelectionKey.OP_CONNECT;
boolean isInterestedInRead    = interestSet & SelectionKey.OP_READ;
boolean isInterestedInWrite   = interestSet & SelectionKey.OP_WRITE;

能够看到,用“位与”操做interest 集合和给定的 SelectionKey 常量,能够肯定某个肯定的事件是否在 interest 集合中

ready集合

ready 集合是通道已经准备就绪的操做的集合。在一次选择(Selection)以后,你会首先访问这个readySet。Selection将在下一小节进行解释。能够这样访问ready集合:

int readySet = selectionKey.readyOps();

能够用像检测 interest 集合那样的方法,来检测channel中什么事件或操做已经就绪。可是,也可使用如下四个方法,它们都会返回一个布尔类型:

selectionKey.isAcceptable();
selectionKey.isConnectable();
selectionKey.isReadable();
selectionKey.isWritable();

Channel + Selector

从SelectionKey访问Channel和Selector很简单。以下:

Channel  channel  = selectionKey.channel();
Selector selector = selectionKey.selector();

附加的对象

能够将一个对象或者更多信息附着到SelectionKey上,这样就能方便的识别某个给定的通道。例如,能够附加 与通道一块儿使用的Buffer,或是包含汇集数据的某个对象。使用方法以下:

selectionKey.attach(theObject);
Object attachedObj = selectionKey.attachment();

还能够在用register()方法向Selector注册Channel的时候附加对象。如:

SelectionKey key = channel.register(selector, SelectionKey.OP_READ, theObject);

经过Selector选择通道

一旦向Selector注册了一或多个通道,就能够调用几个重载的select()方法。这些方法返回你所感兴趣的事件(如链接、接受、读或写)已经准备就绪的那些通道。换句话说,若是你对“读就绪”的通道感兴趣,select()方法会返回读事件已经就绪的那些通道。

下面是select()方法:

  • int select()
  • int select(long timeout)
  • int selectNow()

select()阻塞到至少有一个通道在你注册的事件上就绪了。

select(long timeout) 和 select() 同样,除了最长会阻塞 timeout 毫秒(参数)。

selectNow() 不会阻塞,无论什么通道就绪都马上返回(译者注:此方法执行非阻塞的选择操做。若是自从前一次选择操做后,没有通道变成可选择的,则此方法直接返回零。)。

select()方法返回的int值表示有多少通道已经就绪。亦即,自上次调用select()方法后有多少通道变成就绪状态。若是调用select()方法,由于有一个通道变成就绪状态,返回了1,若再次调用select()方法,若是另外一个通道就绪了,它会再次返回1。若是对第一个就绪的channel没有作任何操做,如今就有两个就绪的通道,但在每次select()方法调用之间,只有一个通道就绪了。

selectedKeys()

一旦调用了select()方法,而且返回值代表有一个或更多个通道就绪了,而后能够经过调用selector的selectedKeys()方法,访问“已选择键集(selected key set)”中的就绪通道。以下所示:

Set selectedKeys = selector.selectedKeys();

当向 Selector 注册 Channel 时,Channel.register() 方法会返回一个 SelectionKey 对象。这个对象表明了注册到该Selector的通道。能够经过SelectionKey的selectedKeySet()方法访问这些对象。

能够遍历这个已选择的键集合来访问就绪的通道。以下:

Set selectedKeys = selector.selectedKeys();
Iterator keyIterator = selectedKeys.iterator();
while(keyIterator.hasNext()) {
    SelectionKey key = keyIterator.next();
    if(key.isAcceptable()) {
        // a connection was accepted by a ServerSocketChannel.
    } else if (key.isConnectable()) {
        // a connection was established with a remote server.
    } else if (key.isReadable()) {
        // a channel is ready for reading
    } else if (key.isWritable()) {
        // a channel is ready for writing
    }
    keyIterator.remove();
}

这个循环遍历已选择键集中的每一个键,并检测各个键所对应的通道的就绪事件。

注意每次迭代末尾的 keyIterator.remove() 调用。Selector不会本身从已选择键集中移除 SelectionKey 实例。必须在处理完通道时本身移除。下次该通道变成就绪时,Selector会再次将其放入已选择键集中。

SelectionKey.channel() 方法返回的通道须要转型成你要处理的类型,如 ServerSocketChannel 或 SocketChannel 等。

wakeUp()

某个线程调用select()方法后阻塞了,即便没有通道已经就绪,也有办法让其从select()方法返回。只要让其它线程在第一个线程调用select()方法的那个对象上调用Selector.wakeup()方法便可。阻塞在select()方法上的线程会立马返回。

若是有其它线程调用了wakeup()方法,但当前没有线程阻塞在select()方法上,下个调用select()方法的线程会当即“醒来(wake up)”。

close()

用完 Selector 后调用其 close() 方法会关闭该 Selector,且使注册到该Selector上的全部SelectionKey实例无效。通道自己并不会关闭。

完整的示例

这里有一个完整的示例,打开一个Selector,注册一个通道注册到这个Selector上(通道的初始化过程略去),而后持续监控这个Selector的四种事件(接受,链接,读,写)是否就绪。

Selector selector = Selector.open();
channel.configureBlocking(false);
SelectionKey key = channel.register(selector, SelectionKey.OP_READ);
while(true) {
  int readyChannels = selector.select();
  if(readyChannels == 0) continue;
  Set selectedKeys = selector.selectedKeys();
  Iterator keyIterator = selectedKeys.iterator();
  while(keyIterator.hasNext()) {
  SelectionKey key = keyIterator.next();
  if(key.isAcceptable()) {
    // a connection was accepted by a ServerSocketChannel.
  } else if (key.isConnectable()) {
    // a connection was established with a remote server.
  } else if (key.isReadable()) {
    // a channel is ready for reading
  } else if (key.isWritable()) {
    // a channel is ready for writing
  }
    keyIterator.remove();
  }
}

分散(Scatter)/汇集(Gather)

分散概念

分散(scatter):从Channel中读取是指在读操做时将读取的数据写入多个buffer中。所以,Channel将从Channel中读取的数据“分散(scatter)”到多个Buffer中。

这里写图片描述

程序清单 1-1
ByteBuffer header = ByteBuffer.allocate(128);  
ByteBuffer body   = ByteBuffer.allocate(1024);  
ByteBuffer[] bufferArray = { header, body };  
channel.read(bufferArray);

注意buffer首先被插入到数组,而后再将数组做为 channel.read() 的输入参数。read() 方法按照 buffer 在数组中的顺序将从 channel 中读取的数据写入到buffer,当一个 buffer 被写满后,channel 紧接着向另外一个 buffer 中写。

Scattering Reads在移动下一个buffer前,必须填满当前的buffer,这也意味着它不适用于动态消息(译者注:消息大小不固定)。换句话说,若是存在消息头和消息体,消息头必须完成填充(例如 128byte),Scattering Reads才能正常工做。

汇集概念

这里写图片描述

汇集(gather):写入Channel是指在写操做时将多个buffer的数据写入同一个Channel,所以,Channel 将多个Buffer中的数据“汇集(gather)”后发送到Channel。

示例1-1
ByteBuffer header = ByteBuffer.allocate(128);  
ByteBuffer body   = ByteBuffer.allocate(1024);  
//write data into buffers  
ByteBuffer[] bufferArray = { header, body };  
channel.write(bufferArray);

buffer的一个数组被传递给了 write() 方法,这个方法写他们在数组中遇到的接下来的 buffer 的内容。只是这些数据在 buffer 的 position 和 limit 直接被写。所以,若是一个buffer有一个128字节的容量,可是只包含了58个字节,只有58个字节能够从 buffer 中写到 channel 。所以,一个汇集写操做经过动态可变大小的消息部分会工做的很好,跟分散读取正好相反。

分散/汇集的应用

scatter / gather常常用于须要将传输的数据分开处理的场合。例如,您可能在编写一个使用消息对象的网络应用程序,每个消息被划分为固定长度的头部和固定长度的正文。您能够建立一个恰好能够容纳头部的缓冲区和另外一个恰好能够容纳正文的缓冲区。当您将它们放入一个数组中并使用分散读取来向它们读入消息时,头部和正文将整齐地划分到这两个缓冲区中。

咱们从缓冲区所获得的方便性对于缓冲区数组一样有效。由于每个缓冲区都跟踪本身还能够接受多少数据,因此分散读取会自动找到有空间接受数据的第一个缓冲区。在这个缓冲区填满后,它就会移动到下一个缓冲区。

简单小例子

RandomAccessFile raf1=new RandomAccessFile("d:\\ay.txt", "rw");
//获取通道
FileChannel channel1 = raf1.getChannel();
//设置缓冲区
ByteBuffer buf1=ByteBuffer.allocate(50);
ByteBuffer buf2=ByteBuffer.allocate(1024);
//分散读取的时候缓存区应该是有序的,因此把几个缓冲区加入数组中
ByteBuffer[] bufs={buf1,buf2};
//通道进行传输
channel1.read(bufs);
//查看缓冲区中的内容
for (int i = 0; i < bufs.length; i++) {
   //切换为读模式
   bufs[i].flip();
}
System.out.println(new String(bufs[0].array(),0,bufs[0].limit()));
System.out.println();
System.out.println(new String(bufs[1].array(),0,bufs[1].limit()));
//汇集写入
RandomAccessFile  raf2=new RandomAccessFile("d:\\al.txt", "rw");
FileChannel channel2 = raf2.getChannel();
//只能经过通道来进行写入
channel2.write(bufs);

其余通道

文件通道

Socket管道

Java NIO中的 SocketChannel 是一个链接到 TCP 网络套接字的通道。能够经过如下2种方式建立 SocketChannel:

  • 打开一个SocketChannel并链接到互联网上的某台服务器。
  • 一个新链接到达 ServerSocketChannel 时,会建立一个 SocketChannel。

打开 SocketChannel

下面是SocketChannel的打开方式:

SocketChannel socketChannel = SocketChannel.open();
socketChannel.connect(new InetSocketAddress("http://jenkov.com",80));

从 SocketChannel 读取数据

要从SocketChannel中读取数据,调用一个read()的方法之一。如下是例子:

ByteBuffer buf = ByteBuffer.allocate(48);
int bytesRead = socketChannel.read(buf);

首先,分配一个Buffer。从SocketChannel读取到的数据将会放到这个Buffer中。

而后,调用SocketChannel.read()。该方法将数据从SocketChannel 读到Buffer中。read()方法返回的int值表示读了多少字节进Buffer里。若是返回的是-1,表示已经读到了流的末尾(链接关闭了)。

写入 SocketChannel

写数据到SocketChannel用的是SocketChannel.write()方法,该方法以一个Buffer做为参数。示例以下:

String newData = "New String to write to file..." + System.currentTimeMillis();
ByteBuffer buf = ByteBuffer.allocate(48);
buf.clear();
buf.put(newData.getBytes());
buf.flip();
while(buf.hasRemaining()) {
    channel.write(buf);
}

注意SocketChannel.write()方法的调用是在一个while循环中的。Write()方法没法保证能写多少字节到SocketChannel。因此,咱们重复调用write()直到Buffer没有要写的字节为止。

非阻塞模式

能够设置 SocketChannel 为非阻塞模式(non-blocking mode).设置以后,就能够在异步模式下调用connect(), read() 和write()了。

connect()

若是SocketChannel在非阻塞模式下,此时调用connect(),该方法可能在链接创建以前就返回了。为了肯定链接是否创建,能够调用finishConnect()的方法。像这样:

socketChannel.configureBlocking(false);
socketChannel.connect(new InetSocketAddress("http://jenkov.com", 80));
while(! socketChannel.finishConnect() ){
    //wait, or do something else...
}

write()

非阻塞模式下,write()方法在还没有写出任何内容时可能就返回了。因此须要在循环中调用write()。前面已经有例子了,这里就不赘述了。

read()

非阻塞模式下,read()方法在还没有读取到任何数据时可能就返回了。因此须要关注它的int返回值,它会告诉你读取了多少字节。

不错的小例子

一下是来自网络的一个小例子,我的以为很不错,就贴到这里。

class NioClient {
    //管道管理器
    private Selector selector;

    public NioClient init(String serverIp, int port) throws IOException{
        //获取socket通道
        SocketChannel channel = SocketChannel.open();

        channel.configureBlocking(false);
        //得到通道管理器
        selector=Selector.open();

        //客户端链接服务器,须要调用channel.finishConnect();才能实际完成链接。
        channel.connect(new InetSocketAddress(serverIp, port));
        //为该通道注册SelectionKey.OP_CONNECT事件
        channel.register(selector, SelectionKey.OP_CONNECT);
        return this;
    }

    public void listen() throws IOException{
        System.out.println("客户端启动");
        //轮询访问selector
        while(true){
            //选择注册过的io操做的事件(第一次为SelectionKey.OP_CONNECT)
            selector.select();
            Iterator<SelectionKey> ite = selector.selectedKeys().iterator();
            while(ite.hasNext()){
                SelectionKey key = ite.next();
                //删除已选的key,防止重复处理
                ite.remove();
                if(key.isConnectable()){
                    SocketChannel channel=(SocketChannel)key.channel();

                    //若是正在链接,则完成链接
                    if(channel.isConnectionPending()){
                        channel.finishConnect();
                    }

                    channel.configureBlocking(false);
                    //向服务器发送消息
                    channel.write(ByteBuffer.wrap(new String("send message to server.").getBytes()));

                    //链接成功后,注册接收服务器消息的事件
                    channel.register(selector, SelectionKey.OP_READ);
                    System.out.println("客户端链接成功");
                }else if(key.isReadable()){ //有可读数据事件。
                    SocketChannel channel = (SocketChannel)key.channel();

                    ByteBuffer buffer = ByteBuffer.allocate(10);
                    channel.read(buffer);
                    byte[] data = buffer.array();
                    String message = new String(data);

                    System.out.println("recevie message from server:, size:" + buffer.position() + " msg: " + message);
//                    ByteBuffer outbuffer = ByteBuffer.wrap(("client.".concat(msg)).getBytes());
//                    channel.write(outbuffer);
                }
            }
        }
    }

    public static void main(String[] args) throws IOException {
        new NioClient().init("127.0.0.1", 9981).listen();
    }
}



class NioServer {
    //通道管理器
    private Selector selector;

    //获取一个ServerSocket通道,并初始化通道
    public NioServer init(int port) throws IOException{
        //获取一个ServerSocket通道
        ServerSocketChannel serverChannel = ServerSocketChannel.open();
        serverChannel.configureBlocking(false);
        serverChannel.socket().bind(new InetSocketAddress(port));
        //获取通道管理器
        selector=Selector.open();
        //将通道管理器与通道绑定,并为该通道注册SelectionKey.OP_ACCEPT事件,
        //只有当该事件到达时,Selector.select()会返回,不然一直阻塞。
        serverChannel.register(selector, SelectionKey.OP_ACCEPT);
        return this;
    }

    public void listen() throws IOException{
        System.out.println("服务器端启动成功");

        //使用轮询访问selector
        while(true){
            //当有注册的事件到达时,方法返回,不然阻塞。
            selector.select();

            //获取selector中的迭代器,选中项为注册的事件
            Iterator<SelectionKey> ite=selector.selectedKeys().iterator();

            while(ite.hasNext()){
                SelectionKey key = ite.next();
                //删除已选key,防止重复处理
                ite.remove();
                //客户端请求链接事件
                if(key.isAcceptable()){
                    ServerSocketChannel server = (ServerSocketChannel)key.channel();
                    //得到客户端链接通道
                    SocketChannel channel = server.accept();
                    channel.configureBlocking(false);
                    //向客户端发消息
                    channel.write(ByteBuffer.wrap(new String("send message to client").getBytes()));
                    //在与客户端链接成功后,为客户端通道注册SelectionKey.OP_READ事件。
                    channel.register(selector, SelectionKey.OP_READ);

                    System.out.println("客户端请求链接事件");
                }else if(key.isReadable()){//有可读数据事件
                    //获取客户端传输数据可读取消息通道。
                    SocketChannel channel = (SocketChannel)key.channel();
                    //建立读取数据缓冲器
                    ByteBuffer buffer = ByteBuffer.allocate(10);
                    int read = channel.read(buffer);
                    byte[] data = buffer.array();
                    String message = new String(data);

                    System.out.println("receive message from client, size:" + buffer.position() + " msg: " + message);
//                    ByteBuffer outbuffer = ByteBuffer.wrap(("server.".concat(msg)).getBytes());
//                    channel.write(outbuffer);
                }
            }
        }
    }

    public static void main(String[] args) throws IOException {
        new NioServer().init(9981).listen();
    }
}

Datagram 通道

Java NIO中的DatagramChannel是一个能收发UDP包的通道。由于UDP是无链接的网络协议,因此不能像其它通道那样读取和写入。它发送和接收的是数据包。

Datagram 通道就做为你们自学的内容。

管道(Pipe)

Java NIO 管道是2个线程之间的单向数据链接。Pipe有一个source通道和一个sink通道。数据会被写到sink通道,从source通道读取。

这里写图片描述

建立管道

经过Pipe.open()方法打开管道。例如:

Pipe pipe = Pipe.open();

向管道写数据

要向管道写数据,须要访问sink通道。像这样:

Pipe.SinkChannel sinkChannel = pipe.sink();

经过调用SinkChannel的write()方法,将数据写入SinkChannel,像这样:

String newData = "New String to write to file..." + System.currentTimeMillis();  
ByteBuffer buf = ByteBuffer.allocate(48);  
buf.clear();  
buf.put(newData.getBytes());  
buf.flip();  
while(buf.hasRemaining()) {  
   <b>sinkChannel.write(buf);</b>  
}

从管道读取数据

从读取管道的数据,须要访问source通道,像这样:

Pipe.SourceChannel sourceChannel = pipe.source();

调用source通道的read()方法来读取数据,像这样:

ByteBuffer buf = ByteBuffer.allocate(48);  
int bytesRead = inChannel.read(buf);

read()方法返回的int值会告诉咱们多少字节被读进了缓冲区。

简单完整实例

//获取管道
Pipe pipe = Pipe.open();
//获取Sink 管道
Pipe.SinkChannel sinkChannel = pipe.sink();
//须要写入数据
String newData = "New String to write to file..." + System.currentTimeMillis();
//新建缓存区
ByteBuffer buf = ByteBuffer.allocate(48);
buf.clear();
//缓存区存放数据
buf.put(newData.getBytes());
buf.flip();
while(buf.hasRemaining()) {
    sinkChannel.write(buf);
}
//获取Source 管道
Pipe.SourceChannel sourceChannel = pipe.source();
ByteBuffer buf2 = ByteBuffer.allocate(48);
int bytesRead = sourceChannel.read(buf2);
while (bytesRead != -1) {
    System.out.println("Read " + bytesRead);
    //buf.flip()的调用,首先读取数据到Buffer,而后反转Buffer,接着再从Buffer中读取数据(注:flip:空翻,反转)
    buf.flip();
    //判断是否有剩余(注:Remaining:剩余的)
    while(buf.hasRemaining()){
        System.out.print((char) buf.get());
    }
    buf.clear();
    bytesRead = sourceChannel.read(buf);
}
sourceChannel.close();
sinkChannel.close();

AIO编程

AIO的特色

  • 读完了再通知我

  • 不会加快IO,只是在读完后进行通知

  • 使用回调函数,进行业务处理

AIO的相关代码:

//AsynchronousServerSocketChannel类
server = AsynchronousServerSocketChannel.open().bind(new InetSocketAddress(PORT));

使用server上的accept方法

public abstract <A> void accept(A attachment,CompletionHandler<AsynchronousSocketChannel,? super A> handler);

CompletionHandler为回调接口,当有客户端accept以后,就作handler中的事情。

NIO与AIO区别

  • NIO是同步非阻塞的,AIO是异步非阻塞的
  • 因为NIO的读写过程依然在应用线程里完成,因此对于那些读写过程时间长的,NIO就不太适合。而AIO的读写过程完成后才被通知,因此AIO可以胜任那些重量级,读写过程长的任务。

参考文献二

1、看图

网上不少IO资料,对新手来讲,越看越晕。根据本身的理解,总结对比了一下BIO、NIO、AIO。

BIO:线程发起IO请求,无论内核是否准备好IO操做,从发起请求起,线程一直阻塞,直到操做完成。以下图:

NIO(reactor模型):线程发起IO请求,当即返回;内核在作好IO操做的准备以后,经过调用注册的回调函数通知线程作IO操做,线程开始阻塞,直到操做完成。以下图:

AIO(proactor模型):线程发起IO请求,当即返回;内存作好IO操做的准备以后,作IO操做,直到操做完成或者失败,经过调用注册的回调函数通知线程作IO操做完成或者失败。以下图:

2、详解

 

一、BIO

     在JDK1.4出来以前,咱们创建网络链接的时候采用BIO模式,须要先在服务端启动一个ServerSocket,而后在客户端启动Socket来对服务端进行通讯,默认状况下服务端须要对每一个请求创建一堆线程等待请求,而客户端发送请求后,先咨询服务端是否有线程相应,若是没有则会一直等待或者遭到拒绝请求,若是有的话,客户端会线程会等待请求结束后才继续执行。

二、NIO

    NIO自己是基于事件驱动思想来完成的,其主要想解决的是BIO的大并发问题: 在使用同步I/O的网络应用中,若是要同时处理多个客户端请求,或是在客户端要同时和多个服务器进行通信,就必须使用多线程来处理。也就是说,将每个客户端请求分配给一个线程来单独处理。这样作虽然能够达到咱们的要求,但同时又会带来另一个问题。因为每建立一个线程,就要为这个线程分配必定的内存空间(也叫工做存储器),并且操做系统自己也对线程的总数有必定的限制。若是客户端的请求过多,服务端程序可能会由于不堪重负而拒绝客户端的请求,甚至服务器可能会所以而瘫痪。

    NIO基于Reactor,当socket有流可读或可写入socket时,操做系统会相应的通知引用程序进行处理,应用再将流读取到缓冲区或写入操做系统。  也就是说,这个时候,已经不是一个链接就要对应一个处理线程了,而是有效的请求,对应一个线程,当链接没有数据时,是没有工做线程来处理的。

   BIO与NIO一个比较重要的不一样,是咱们使用BIO的时候每每会引入多线程,每一个链接一个单独的线程;而NIO则是使用单线程或者只使用少许的多线程,每一个链接共用一个线程。

      NIO的最重要的地方是当一个链接建立后,不须要对应一个线程,这个链接会被注册到多路复用器上面,因此全部的链接只须要一个线程就能够搞定,当这个线程中的多路复用器进行轮询的时候,发现链接上有请求的话,才开启一个线程进行处理,也就是一个请求一个线程模式。

      在NIO的处理方式中,当一个请求来的话,开启线程进行处理,可能会等待后端应用的资源(JDBC链接等),其实这个线程就被阻塞了,当并发上来的话,仍是会有BIO同样的问题。

  HTTP/1.1出现后,有了Http长链接,这样除了超时和指明特定关闭的http header外,这个连接是一直打开的状态的,这样在NIO处理中能够进一步的进化,在后端资源中能够实现资源池或者队列,当请求来的话,开启的线程把请求和请求数据传送给后端资源池或者队列里面就返回,而且在全局的地方保持住这个现场(哪一个链接的哪一个请求等),这样前面的线程仍是能够去接受其余的请求,然后端的应用的处理只须要执行队列里面的就能够了,这样请求处理和后端应用是异步的.当后端处理完,到全局地方获得现场,产生响应,这个就实现了异步处理。

三、AIO

     与NIO不一样,当进行读写操做时,只须直接调用API的read或write方法便可。这两种方法均为异步的,对于读操做而言,当有流可读取时,操做系统会将可读的流传入read方法的缓冲区,并通知应用程序;对于写操做而言,当操做系统将write方法传递的流写入完毕时,操做系统主动通知应用程序。  便可以理解为,read/write方法都是异步的,完成后会主动调用回调函数。  在JDK1.7中,这部份内容被称做NIO.2,主要在Java.nio.channels包下增长了下面四个异步通道:

  • AsynchronousSocketChannel
  • AsynchronousServerSocketChannel
  • AsynchronousFileChannel
  • AsynchronousDatagramChannel

其中的read/write方法,会返回一个带回调函数的对象,当执行完读取/写入操做后,直接调用回调函数。

BIO是一个链接一个线程。

NIO是一个请求一个线程。

AIO是一个有效请求一个线程。

先来个例子理解一下概念,以银行取款为例: 

  • 同步 : 本身亲自出马持银行卡到银行取钱(使用同步IO时,Java本身处理IO读写);
  • 异步 : 委托一小弟拿银行卡到银行取钱,而后给你(使用异步IO时,Java将IO读写委托给OS处理,须要将数据缓冲区地址和大小传给OS(银行卡和密码),OS须要支持异步IO操做API);
  • 阻塞 : ATM排队取款,你只能等待(使用阻塞IO时,Java调用会一直阻塞到读写完成才返回);
  • 非阻塞 : 柜台取款,取个号,而后坐在椅子上作其它事,等号广播会通知你办理,没到号你就不能去,你能够不断问大堂经理排到了没有,大堂经理若是说还没到你就不能去(使用非阻塞IO时,若是不能读写Java调用会立刻返回,当IO事件分发器会通知可读写时再继续进行读写,不断循环直到读写完成)

Java对BIO、NIO、AIO的支持:

  • Java BIO : 同步并阻塞,服务器实现模式为一个链接一个线程,即客户端有链接请求时服务器端就须要启动一个线程进行处理,若是这个链接不作任何事情会形成没必要要的线程开销,固然能够经过线程池机制改善。

  • Java NIO : 同步非阻塞,服务器实现模式为一个请求一个线程,即客户端发送的链接请求都会注册到多路复用器上,多路复用器轮询到链接有I/O请求时才启动一个线程进行处理。

  • Java AIO(NIO.2) : 异步非阻塞,服务器实现模式为一个有效请求一个线程,客户端的I/O请求都是由OS先完成了再通知服务器应用去启动线程进行处理,

BIO、NIO、AIO适用场景分析:

  • BIO方式适用于链接数目比较小且固定的架构,这种方式对服务器资源要求比较高,并发局限于应用中,JDK1.4之前的惟一选择,但程序直观简单易理解。

  • NIO方式适用于链接数目多且链接比较短(轻操做)的架构,好比聊天服务器,并发局限于应用中,编程比较复杂,JDK1.4开始支持。

  • AIO方式使用于链接数目多且链接比较长(重操做)的架构,好比相册服务器,充分调用OS参与并发操做,编程比较复杂,JDK7开始支持。

另外,I/O属于底层操做,须要操做系统支持,并发也须要操做系统的支持,因此性能方面不一样操做系统差别会比较明显。

在高性能的I/O设计中,有两个比较著名的模式Reactor和Proactor模式,其中Reactor模式用于同步I/O,而Proactor运用于异步I/O操做。

    在比较这两个模式以前,咱们首先的搞明白几个概念,什么是阻塞和非阻塞,什么是同步和异步,同步和异步是针对应用程序和内核的交互而言的,同步指的是用户进程触发IO操做并等待或者轮询的去查看IO操做是否就绪,而异步是指用户进程触发IO操做之后便开始作本身的事情,而当IO操做已经完成的时候会获得IO完成的通知。而阻塞和非阻塞是针对于进程在访问数据的时候,根据IO操做的就绪状态来采起的不一样方式,说白了是一种读取或者写入操做函数的实现方式,阻塞方式下读取或者写入函数将一直等待,而非阻塞方式下,读取或者写入函数会当即返回一个状态值。

 通常来讲I/O模型能够分为:同步阻塞,同步非阻塞,异步阻塞,异步非阻塞IO

同步阻塞IO:在此种方式下,用户进程在发起一个IO操做之后,必须等待IO操做的完成,只有当真正完成了IO操做之后,用户进程才能运行。JAVA传统的IO模型属于此种方式!

同步非阻塞IO:在此种方式下,用户进程发起一个IO操做之后边可返回作其它事情,可是用户进程须要时不时的询问IO操做是否就绪,这就要求用户进程不停的去询问,从而引入没必要要的CPU资源浪费。其中目前JAVA的NIO就属于同步非阻塞IO。

异步阻塞IO:此种方式下是指应用发起一个IO操做之后,不等待内核IO操做的完成,等内核完成IO操做之后会通知应用程序,这其实就是同步和异步最关键的区别,同步必须等待或者主动的去询问IO是否完成,那么为何说是阻塞的呢?由于此时是经过select系统调用来完成的,而select函数自己的实现方式是阻塞的,而采用select函数有个好处就是它能够同时监听多个文件句柄,从而提升系统的并发性!

 异步非阻塞IO:在此种模式下,用户进程只须要发起一个IO操做而后当即返回,等IO操做真正的完成之后,应用程序会获得IO操做完成的通知,此时用户进程只须要对数据进行处理就行了,不须要进行实际的IO读写操做,由于真正的IO读取或者写入操做已经由内核完成了。

相关文章
相关标签/搜索