Socket网络通讯之NIO

Socket网络通讯之NIOjava

NIO:new io ,java1.4开始推出的可非阻塞IO。数据库

java.nio 包,可解决BIO阻塞的不足 但比BIO学习、使用复杂。网络

能够以阻塞、非阻塞两种方式工做。app

能够在非阻塞模式下,能够用少许(甚至一个)线程处理大量IO链接。框架

Java7推出了 Nio.2  (又称AIO,异步IO)。异步

1、NIO工做流程以下图:socket

流程图如上所示,要理解NioSocket的使用必须先理解三个概念:Selector,Channel和Buffer。举个例子。大学时有人卖电话卡,提供送货上门服务,只要有人打电话,他就送过去、收钱在回去,而后等下一个电话,这就至关于普通的Socket处理请求的模式。若是请求不是不少,这是没有问题的。而像如今的电商配送模式——送快递就相似于NioSocket。快递并不会一件一件的送,而是将不少件货一块儿拿去送,并且在中转站都有专门的分拣员负责按配送范围把货物分给不一样的送货员,这样效率就提升了不少。Selector就是中转战的分拣员,Channel就是送货员(或者开往某个区域的配货车),Buffer就是所要送的货物。
NioSocket使用中首先要建立ServerSocketChannel,而后注册Selector,接下来就能够用Selector接收请求并处理了。 
ServerSocketChannel可使用本身的静态工程方法open建立。每一个ServerSocketChannel对应一个ServerSocket,能够调用其socket方法来获取,不过若是直接使用获取到ServerSocket来监听请求,那仍是原来的处理模式,通常使用获取到的ServerSocket来绑定端口。ServerSocketChannel能够经过configureBlocking方法来设置是否采用阻塞模式,若是要采用非阻塞模式能够用configureBlocking(false)来设置,设置了非阻塞模式以后就能够调用register方法注册Selector来使用了(阻塞模式不可使用Selector)。 
Selector可使用本身的静态工程方法open建立,建立后经过Channel的register方法注册到ServerSocketChannel或者SocketChannel上,注册完以后Selector就能够经过select方法来等等请求,select方法有一个long类型的参数,表明最长等待时间,若是在这段时间里接收到了相应操做的请求则返回能够处理的请求的数量,不然在超时后返回0,程序继续往下走,若是传入的参数为0或者调用无参数的重载方法,select方法会采用阻塞模式直到有相应操做的请求出现。当接收到请求后Selector调用selectedKeys方法返回SelectedKey的集合。 
selectedKey保存了处理当前请求的Channel和Selector,而且提供了不一样的操做类型。Channel在注册Selector的时候能够经过register的第二个参数选择特定的操做,这里的操做就是在selectedKey中定义的,一共有4种: 
SelectionKey.OP_ACCEPT 
SelectionKey.OP_CONNECT 
SelectionKey.OP_READ 
SelectionKey.OP_WRITE 
分别表示接收请求操做、链接操做、读操做和写操做,只有在register方法中注册了相应的操做Selector才会关心相应类型操做的请求。 
Channel和Selector并无谁属于谁的关系,就像数据库里的多对多的关系,不过Selecor这个分拣员分拣的更细,它能够按不一样类型来分拣,分拣后的结果保存在SelectionKey中,能够分别经过SelectionKey的channel方法和selector方法来获取对应的Channel和Selector,并且还能够经过isAcceptable、isConnectable、isReadable和isWritable方法来判断是什么类型的操做。 
NioSocket中服务端的处理过程能够分为5步: 
一、建立ServerSocketChannel并设置相应参数 。
二、建立Selector并注册到ServerSocketChannel上 。
三、调用Selector的select方法等待请求 。
四、Selector接收到请求后使用selectionKeys返回SelectionKey集合 。
五、使用SelectionKey获取到Channel、Selector和操做类型并进行具体操做。ide

下面具体说说Selector,Channel和Buffer的用法。学习

 2、Selector 选择器 :this

Selector 选择器 :非阻塞模式下,一个选择器可检测多个SelectableChannel,得到为读写等操做准备好的通道。就不须要咱们用循环去判断了。经过Selector,一个线程就能够处理多个Channel,可极大减小线程数。 用cpu核心数量的线程,充分利用cpu资源,又减小线程切换。

Selector 用法:1,建立Selector。Selector selector = Selector.open();

       2,将要交给Selector检测的SelectableChannel注册进来。

          (1)channel.configureBlocking(false);   // 注意:必定要设为非阻塞模式

          (2)SelectionKey key = channel.register(selector, SelectionKey.OP_READ);

          channel.register方法的第二个参数指定要selector帮忙监听的就绪操做:SelectionKey.OP_CONNECT(可链接);SelectionKey.OP_ACCEPT(可接受);SelectionKey.OP_READ(可读);SelectionKey.OP_WRITE(可写)。

       3,经过Selector来选择就绪的Channel,有三个select方法。int n = selector.select();

        (1) int select() //阻塞直到有就绪的Channel。
        (2)int select(long timeout) //阻塞最长多久。
        (3)int selectNow() //不阻塞。 

          三个方法返回值:就绪的Channel数量。 
          注意:select()方法返回当前的就绪数量。
          例:第一次select返回1;第二次select,又一个channel就绪,若是第一个就绪的channel还未被处理,则此时就绪的channel是2个,会返回2。在用线程池异步处理任务时需特别当心,重复选择!

       4,得到就绪的SelectionKey集合(当有就绪的Channel时)。

        Set<SelectionKey> selectedKeys = selector.selectedKeys();

       5,处理selectedKeys set。  

        Set<SelectionKey> selectedKeys = selector.selectedKeys();
        Iterator<SelectionKey> keyIterator = selectedKeys.iterator();
        while(keyIterator.hasNext()) {
        SelectionKey key = keyIterator.next();
        if(key.isAcceptable()) {
        // a connection was accepted by a ServerSocketChannel.
        } else if (key.isConnectable()) {
        // a connection was established with a remote server.
        } else if (key.isReadable()) {
        // a channel is ready for reading
        } else if (key.isWritable()) {
        // a channel is ready for writing
        }
        keyIterator.remove(); //处理了,必定要从selectedKey集中移除
        }

 3、Channel 通道:数据的来源或去向目标

Java NIO: Channels read data into Buffers, and Buffers write data into Channels。

一、Channel的实现

  FileChannel 文件通道
  DatagramChannel UDP协议的通道
  SocketChannel 一般通道
  ServerSocketChannel 服务通道

二、各Channel的API方法

  open():建立通道
  read(Buffer):从通道中读数据放入到buffer
  write(Buffer):将buffer中的数据写给通道

3、Buffer   缓冲区:数据的临时存放区

Buffer类型:ByteBuffer、MappedByteBuffer、CharBuffer、DoubleBuffer、FloatBuffer、IntBuffer、LongBuffer、ShortBuffer

Buffer的基本使用步骤:

(1)调用xxxBuffer.allocate(int)建立Buffer
(2)调用put方法往Buffer中写数据
(3)调用buffer.flip()将buffer转为读模式
(4)读取buffer中的数据
(5)Call buffer.clear() or buffer.compact()

一、Buffer的操做API 

(1)调用xxxBuffer.allocate(int)建立Buffer
  ByteBuffer buf = ByteBuffer.allocate(48);
  CharBuffer buf = CharBuffer.allocate(1024);
(2)往Buffer中写数据
  int bytesRead = inChannel.read(buf); //read into buffer.
  buf.put(127);
(3)调用buffer.flip()将buffer转为读模式
  buf.flip(); // 转为读模式,position变为0
(4)读取buffer中的数据
  //read from buffer into channel.
  int bytesWritten = inChannel.write(buf);
  byte aByte = buf.get();
(5)读完后,调用clear()或compact()为下次写作好准备
  buf.clear(); //position=0 limit = capacity
  buf.compact(); //整理,将未读的数据移动到头部

以下所示为NioServer和NioClient的代码

public class NioServer {
    private static Charset charset = Charset.forName("UTF-8");
    private static CharsetDecoder decoder = charset.newDecoder();
    public static void main(String[] args) throws IOException {
        int port = 1104;
        // 极少的线程
        int threads = 3;
        ExecutorService tpool = Executors.newFixedThreadPool(threads);
        // 一、获得一个selector
        Selector selector = Selector.open();
        try (ServerSocketChannel ssc = ServerSocketChannel.open()) {
            ssc.bind(new InetSocketAddress(port));
            // 2 注册到selector
            // 要非阻塞
            ssc.configureBlocking(false);
            // ssc向selector 注册,监听链接到来。
            ssc.register(selector, SelectionKey.OP_ACCEPT);
            // 链接计数
            int connectionCount = 0;
            // 三、循环选择就绪的通道
            while (true) {
                // 阻塞等待就绪的事件
                int readyChannels = selector.select();
                // 由于select()阻塞能够被中断
                if (readyChannels == 0) {
                    continue;
                }

                // 取到就绪的key集合
                Set<SelectionKey> selectedKeys = selector.selectedKeys();
                Iterator<SelectionKey> keyIterator = selectedKeys.iterator();
                while (keyIterator.hasNext()) {
                    SelectionKey key = keyIterator.next();
                    if (key.isAcceptable()) {
                        // a connection was accepted by a ServerSocketChannel.
                        ServerSocketChannel sssc = (ServerSocketChannel) key.channel();
                        // 接受链接
                        SocketChannel cc = sssc.accept();
                        // 请selector 帮忙检测数据到了
                        // 设置非阻塞
                        cc.configureBlocking(false);
                        // 向selector 注册
                        cc.register(selector, SelectionKey.OP_READ, ++connectionCount);
                    } else if (key.isConnectable()) {
                        // a connection was established with a remote server.
                    } else if (key.isReadable()) {
                        // a channel is ready for reading
                        // 四、读取数据进行处理
                        // 交各线程池去处理
                        tpool.execute(new SocketReadProcess(key));
                        // 取消一下注册,防止线程池处理不及时,没有注销掉
                        key.cancel();
                    } else if (key.isWritable()) {
                        // a channel is ready for writing
                    }
                    keyIterator.remove(); // 处理了,必定要从selectedKey集中移除
                }
            }
        }
    }
    
    static class SocketReadProcess implements Runnable {
        SelectionKey key;
        public SocketReadProcess(SelectionKey key) {
            super();
            this.key = key;
        }
        @Override
        public void run() {
            try {
                System.out.println("链接" + key.attachment() + "发来:" + readFromChannel());
                // 若是链接不须要了,就关闭
                key.channel().close();
            } catch (IOException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }

        private String readFromChannel() throws IOException {
            SocketChannel sc = (SocketChannel) key.channel();
            int bfsize = 1024;
            ByteBuffer rbf = ByteBuffer.allocateDirect(bfsize);
            // 定义一个更大的buffer
            ByteBuffer bigBf = null;
            // 读的次数计数
            int count = 0;
            while ((sc.read(rbf)) != -1) {
                count++;
                ByteBuffer temp = ByteBuffer.allocateDirect(bfsize * (count + 1));
                if (bigBf != null) {
                    // 将buffer有写转为读模式
                    bigBf.flip();
                    temp.put(bigBf);
                }
                bigBf = temp;
                // 将此次读到的数据放入大buffer
                rbf.flip();
                bigBf.put(rbf);
                // 为下次读,清理。
                rbf.clear();
                // 读出的是字节,要转为字符串
            }
            if (bigBf != null) {
                // 转为读模式
                bigBf.flip();
                // 转成CharBuffer,再转为字符串。
                return decoder.decode(bigBf).toString();
            }
            return null;
        }
    }
}
NioServer
public class NioClient {
    static Charset charset = Charset.forName("UTF-8");
    public static void main(String[] args) {
        try (SocketChannel sc = SocketChannel.open();) {
            // 链接 会阻塞
            boolean connected = sc.connect(new InetSocketAddress("localhost", 1104));
            System.out.println("connected=" + connected);
            //
            Scanner scanner = new Scanner(System.in);
            System.out.println("请输入:");
            String mess = scanner.nextLine();
            ByteBuffer bf = ByteBuffer.wrap(mess.getBytes(charset));
            while (bf.hasRemaining()) {
                int writedCount = sc.write(bf);
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}
NioClient

如上代码所示,先执行NioServer再执行NioClient,用debug模式分步执行,多个客户端链接的时候,会发现不会阻塞。

 固然对于NIO通讯,还可使用non-blockin模式和更加稳定的java开源框架Netty和MINA。

相关文章
相关标签/搜索