java NIO原理及实例

一、reactor(反应器)模式html

  使用单线程模拟多线程,提升资源利用率和程序的效率,增长系统吞吐量。下面例子比较形象的说明了什么是反应器模式:java

  一个老板经营一个饭店,react

  传统模式 - 来一个客人安排一个服务员招呼,客人很满意;(至关于一个链接一个线程)服务器

  后来客人愈来愈多,须要的服务员愈来愈多,资源条件不足以再请更多的服务员了,传统模式已经不能知足需求。老板之因此为老板天然有过人之处,老板发现,服务员在为客人服务时,当客人点菜的时候,服务员基本处于等待状态,(阻塞线程,不作事)。网络

  因而乎就让服务员在客人点菜的时候,去为其余客人服务,当客人菜点好后再招呼服务员便可。 --反应器(reactor)模式诞生了多线程

  饭店的生意红红火火,几个服务员就足以支撑大量的客流量,老板用有限的资源赚了更多的money~~~~^_^app

 

二、NIO中的重要概念 通道、缓冲区、选择器异步

  通道:相似于流,可是能够异步读写数据(流只能同步读写),通道是双向的,(流是单向的),通道的数据老是要先读到一个buffer 或者 从一个buffer写入,即通道与buffer进行数据交互。socket

  通道类型:  ide

    • FileChannel:从文件中读写数据。  
    • DatagramChannel:能经过UDP读写网络中的数据。  
    • SocketChannel:能经过TCP读写网络中的数据。  
    • ServerSocketChannel:能够监听新进来的TCP链接,像Web服务器那样。对每个新进来的链接都会建立一个SocketChannel。  

  FileChannel比较特殊,它能够与通道进行数据交互, 不能切换到非阻塞模式,套接字通道能够切换到非阻塞模式;

  缓冲区 - 本质上是一块能够存储数据的内存,被封装成了buffer对象而已!

  缓冲区类型:

    • ByteBuffer  
    • MappedByteBuffer  
    • CharBuffer  
    • DoubleBuffer  
    • FloatBuffer  
    • IntBuffer  
    • LongBuffer  
    • ShortBuffer  

  经常使用方法:

    •   allocate() - 分配一块缓冲区  
    •   put() -  向缓冲区写数据
    •   get() - 向缓冲区读数据  
    •   filp() - 将缓冲区从写模式切换到读模式  
    •     clear() - 从读模式切换到写模式,不会清空数据,但后续写数据会覆盖原来的数据,即便有部分数据没有读,也会被遗忘;  
    •       compact() - 从读数据切换到写模式,数据不会被清空,会将全部未读的数据copy到缓冲区头部,后续写数据不会覆盖,而是在这些数据以后写数据
    •   mark() - 对position作出标记,配合reset使用
    •       reset() - 将position置为标记值    

缓冲区的一些属性:

    •     capacity - 缓冲区大小,不管是读模式仍是写模式,此属性值不会变;
    •     position - 写数据时,position表示当前写的位置,每写一个数据,会向下移动一个数据单元,初始为0;最大为capacity - 1

        切换到读模式时,position会被置为0,表示当前读的位置

    •     limit - 写模式下,limit 至关于capacity 表示最多能够写多少数据,切换到读模式时,limit 等于原先的position,表示最多能够读多少数据。

  选择器:至关于一个观察者,用来监听通道感兴趣的事件,一个选择器能够绑定多个通道;

   通道向选择器注册时,须要指定感兴趣的事件,选择器支持如下事件:

    • SelectionKey.OP_CONNECT
    • SelectionKey.OP_ACCEPT
    • SelectionKey.OP_READ
    • SelectionKey.OP_WRITE  

   若是你对不止一种事件感兴趣,那么能够用“位或”操做符将常量链接起来,以下:

     int interestSet = SelectionKey.OP_READ | SelectionKey.OP_WRITE; 

   通道向选择器注册时,会返回一个 SelectionKey对象,具备以下属性

    • interest集合
    • ready集合  
    • Channel  
    • Selector
    • 附加的对象(可选)  

  用“位与”操做interest 集合和给定的SelectionKey常量,能够肯定某个肯定的事件是否在interest 集合中。

int interestSet = selectionKey.interestOps();  
 
boolean isInterestedInAccept  = interestSet & SelectionKey.OP_ACCEPT;
boolean isInterestedInConnect = interestSet & SelectionKey.OP_CONNECT; boolean isInterestedInRead = interestSet & SelectionKey.OP_READ; boolean isInterestedInWrite = interestSet & SelectionKey.OP_WRITE;

  ready 集合是通道已经准备就绪的操做的集合。在一次选择(Selection)以后,你会首先访问这个ready set。Selection将在下一小节进行解释。能够这样访问ready集合:
  int readySet = selectionKey.readyOps();

   也可使用如下四个方法获取已就绪事件,返回值为boolean:

selectionKey.isAcceptable();  
selectionKey.isConnectable();  
selectionKey.isReadable();  
selectionKey.isWritable();  

   能够将一个对象或者更多信息附着到SelectionKey上,即记录在附加对象上,方法以下:

selectionKey.attach(theObject);  
Object attachedObj = selectionKey.attachment();  

   能够经过选择器的select方法获取是否有就绪的通道;

    • int select()  
    • int select(long timeout)  
    • int selectNow()

  返回值表示上次执行select以后,就绪通道的个数。 

  能够经过selectedKeySet获取已就绪的通道。返回值是SelectionKey 的集合,处理完相应的通道以后,须要removed 由于Selector不会本身removed

 

  select阻塞后,能够用wakeup唤醒;执行wakeup时,若是没有阻塞的select  那么执行完wakeup后下一个执行select就会当即返回。

  调用close() 方法关闭selector

 下面是一个简单的实例代码,帮助理解上面的内容:

package com.pt.nio;

import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.nio.charset.CharsetDecoder;
import java.util.Iterator;
import java.util.Set;

public class Reactor implements Runnable {
    public int id = 100001;
    public int bufferSize = 2048;
    @Override
    public void run() {
        // TODO Auto-generated method stub
        init();
    }

    public void init() {
        try {
            // 建立通道和选择器
            ServerSocketChannel socketChannel = ServerSocketChannel.open();
            Selector selector = Selector.open();
            InetSocketAddress inetSocketAddress = new InetSocketAddress(
                    InetAddress.getLocalHost(), 4700);
            socketChannel.socket().bind(inetSocketAddress);
            // 设置通道非阻塞 绑定选择器
            socketChannel.configureBlocking(false);
            socketChannel.register(selector, SelectionKey.OP_ACCEPT).attach(
                    id++);
            System.out.println("Server started .... port:4700");
            listener(selector);

        } catch (Exception e) {
            // TODO: handle exception
        }
    }

    public void listener(Selector in_selector) {
        try {
            while (true) {
                Thread.sleep(1*1000);
                in_selector.select(); // 阻塞 直到有就绪事件为止
                Set<SelectionKey> readySelectionKey = in_selector
                        .selectedKeys();
                Iterator<SelectionKey> it = readySelectionKey.iterator();
                while (it.hasNext()) {
                    SelectionKey selectionKey = it.next();
                    // 判断是哪一个事件
                    if (selectionKey.isAcceptable()) {// 客户请求链接
                        System.out.println(selectionKey.attachment()
                                + " - 接受请求事件");
                        // 获取通道 接受链接,
                        // 设置非阻塞模式(必须),同时须要注册 读写数据的事件,这样有消息触发时才能捕获
                        ServerSocketChannel serverSocketChannel = (ServerSocketChannel) selectionKey
                                .channel();
                        serverSocketChannel
                                .accept()
                                .configureBlocking(false)
                                .register(
                                        in_selector,
                                        SelectionKey.OP_READ
                                                | SelectionKey.OP_WRITE).attach(id++);
                        System.out
                                .println(selectionKey.attachment() + " - 已链接");

                        // 下面这种写法是有问题的 不该该在serverSocketChannel上面注册
                        /*
                         * serverSocketChannel.configureBlocking(false);
                         * serverSocketChannel.register(in_selector,
                         * SelectionKey.OP_READ);
                         * serverSocketChannel.register(in_selector,
                         * SelectionKey.OP_WRITE);
                         */
                    }
                    if (selectionKey.isReadable()) {// 读数据
                        System.out.println(selectionKey.attachment()
                                + " - 读数据事件");
                        SocketChannel clientChannel=(SocketChannel)selectionKey.channel();
                        ByteBuffer receiveBuf = ByteBuffer.allocate(bufferSize);
                        clientChannel.read(receiveBuf);
                        System.out.println(selectionKey.attachment()
                                + " - 读取数据:" + getString(receiveBuf));
                    }
                    if (selectionKey.isWritable()) {// 写数据
                        System.out.println(selectionKey.attachment()
                                + " - 写数据事件");
                        SocketChannel clientChannel = (SocketChannel) selectionKey.channel();
                        ByteBuffer sendBuf = ByteBuffer.allocate(bufferSize);
                        String sendText = "hello\n";
                        sendBuf.put(sendText.getBytes());
                        sendBuf.flip();        //写完数据后调用此方法
                        clientChannel.write(sendBuf);
                    }
                    if (selectionKey.isConnectable()) {
                        System.out.println(selectionKey.attachment()
                                + " - 链接事件");
                    }
                    // 必须removed 不然会继续存在,下一次循环还会进来,
                    // 注意removed 的位置,针对一个.next() remove一次
                    it.remove(); 
                }
            }
        } catch (Exception e) {
            // TODO: handle exception
            System.out.println("Error - " + e.getMessage());
            e.printStackTrace();
        }

    }
    /**
     * ByteBuffer 转换 String
     * @param buffer
     * @return
     */
    public static String getString(ByteBuffer buffer)
    {
        String string = "";
        try
        {
            for(int i = 0; i<buffer.position();i++){
                string += (char)buffer.get(i);
            }
            return string;
        }
        catch (Exception ex)
        {
            ex.printStackTrace();
            return "";
        }
    }
}
NIO服务器端
package com.pt.bio;

import java.io.*;
import java.net.*;

public class BioServer implements Runnable {

    @Override
    public void run() {
        // TODO Auto-generated method stub
        System.out.println("Hello Server!!");

        try {
            ServerSocket server = null;
            try {
                server = new ServerSocket(4700);
                // 建立一个ServerSocket在端口4700监听客户请求
            } catch (Exception e) {
                System.out.println("can not listen to:" + e);
                // 出错,打印出错信息
            }
            Socket socket = null;
            try {
                socket = server.accept();
                // 使用accept()阻塞等待客户请求,有客户
                // 请求到来则产生一个Socket对象,并继续执行
            } catch (Exception e) {
                System.out.println("Error." + e);
                // 出错,打印出错信息
            }
            String line;
            BufferedReader is = new BufferedReader(new InputStreamReader(
                    socket.getInputStream()));
            // 由Socket对象获得输入流,并构造相应的BufferedReader对象
            
            // 由Socket对象获得输出流,并构造PrintWriter对象
//            BufferedReader sin = new BufferedReader(new InputStreamReader(
//                    System.in));
            // 由系统标准输入设备构造BufferedReader对象
            System.out.println("Client:" + is.readLine());
            PrintWriter os = new PrintWriter(socket.getOutputStream());
            // 在标准输出上打印从客户端读入的字符串
            line = "hello";
            // 从标准输入读入一字符串
//            while (!line.equals("bye")) {
                // 若是该字符串为 "bye",则中止循环
                os.println(line);
                // 向客户端输出该字符串
                os.flush();
                // 刷新输出流,使Client立刻收到该字符串
//                System.out.println("Server:" + line);
                // 在系统标准输出上打印读入的字符串
//                System.out.println("Client:" + is.readLine());
                // 从Client读入一字符串,并打印到标准输出上
//                line = sin.readLine();
                // 从系统标准输入读入一字符串
//            } // 继续循环
//            os.close(); // 关闭Socket输出流
            is.close(); // 关闭Socket输入流
            socket.close(); // 关闭Socket
            server.close(); // 关闭ServerSocket
        } catch (Exception e) {
            System.out.println("Error." + e);
            // 出错,打印出错信息
        }

    }

}
BIO服务器端
package com.pt;

import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.net.Socket;

import org.junit.Test;

import com.pt.bio.BioServer;
import com.pt.nio.Reactor;

public class TestReactor {

    @Test
    public void testConnect() throws Exception{
        Socket socket=new Socket("192.168.82.35",4700);//BIO 阻塞
        System.out.println("链接成功");
        BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
         
        //下面这种写法,不用关闭客户端,服务器端也是能够收到的
        {
            PrintWriter printWriter = new PrintWriter(socket.getOutputStream(), true);
            printWriter.println("hi");
            printWriter.flush();
        }
        //这种写法必须关闭客户端,服务器端才能够收到 NIO不用
        {
//        socket.getOutputStream().write(new byte[]{'h','i'});
//        socket.getOutputStream().flush();
        //必须关闭BIO服务器才能收到消息.NIO服务器不须要关闭
        //socket.close();
        }
        byte[] buf = new byte[2048];
        System.out.println("准备读取数据~~");
        
        while(true){
            try {
                //两种读取数据方式
                int count = socket.getInputStream().read(buf);        //会阻塞
                //String readFromServer = bufferedReader.readLine();//能够读取到数据 会阻塞,直到碰见\n
                //System.out.println("方式二: 读取数据" + readFromServer);    
                System.out.println("方式一: 读取数据" + new String(buf) + " count = " + count);
                Thread.sleep(1*1000);
            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
            //break;
        }
        
    }
    
    @Test 
    public void testNioServer(){
        Thread server = new Thread(new Reactor());
        server.start();

        while(true){
            try {
                Thread.sleep(3*1000);
            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }
    }
    
    @Test
    public void testBioServer(){
        Thread server = new Thread(new BioServer());
        server.start();

        while(true){
            try {
                Thread.sleep(3*1000);
            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }
    }

}
BIO客户端及测试类

其中 testNioServer()方法,是启动NIO服务器端;

testBioServer()方法是启动BIO服务器端

testConnect()是BIO的一个链接

基于NIO实现的时钟服务器:http://www.cnblogs.com/tengpan-cn/p/6529628.html

 

 

一篇写的比较详细的JAVA NIO的文章:http://www.iteye.com/magazines/132-Java-NIO

相关文章
相关标签/搜索