nio服务器端

使用异步处理IO的方式,使用一个线程,处理大量的连接。java

先对NIO原理和通讯模型作一些了解。ios

1. 由一个专门的线程来处理全部的 IO 事件,并负责分发。 
2. 事件驱动机制:事件到的时候触发,而不是同步的去监视事件。 
3. 线程通信:线程之间经过 wait,notify 等方式通信。保证每次上下文切换都是有意义的。减小无谓的线程切换。 服务器

Java NIO的服务端只需启动一个专门的线程来处理全部的 IO 事件,这种通讯模型是怎么实现的呢?呵呵,咱们一块儿来探究它的奥秘吧。java NIO采用了双向通道(channel)进行数据传输,而不是单向的流(stream),在通道上能够注册咱们感兴趣的事件。一共有如下四种事件:异步

 

事件名 对应值
服务端接收客户端链接事件 SelectionKey.OP_ACCEPT(16)
客户端链接服务端事件 SelectionKey.OP_CONNECT(8)
读事件 SelectionKey.OP_READ(1)
写事件 SelectionKey.OP_WRITE(4)

服务端和客户端各自维护一个管理通道的对象,咱们称之为selector,该对象能检测一个或多个通道 (channel) 上的事件。咱们以服务端为例,若是服务端的selector上注册了读事件,某时刻客户端给服务端发送了一些数据,阻塞I/O这时会调用read()方法阻塞地读取数据,而NIO的服务端会在selector中添加一个读事件。服务端的处理线程会轮询地访问selector,若是访问selector时发现有感兴趣的事件到达,则处理这些事件,若是没有感兴趣的事件到达,则处理线程会一直阻塞直到感兴趣的事件到达为止。下面是我理解的java NIO的通讯模型示意图:socket

package priv.lee.paradise.nioserver.listen;


import priv.lee.paradise.nioserver.distribute.DoGetOperater;
import priv.lee.paradise.nioserver.distribute.DoPostOperater;
import priv.lee.paradise.nioserver.util.SocketUtil;

import java.nio.channels.SocketChannel;

/**
 * Created by li on 2016/12/12.
 * 当所监听的端口有链接进来的时候,可经过端口监听器通知观察者进行下一步操做。
 */
class ListenerObserver {

    /**
     * 当有新链接进来的时候,会通知该类调用静态方法。
     * 每传入一个新的socketChannel会新建一个线程对其进行处理。
     *
     * @param socketChannel 客户端和服务器的嵌套字。
     */
    static void acceptNotify(SocketChannel socketChannel) {
        String requestInfo = SocketUtil.getRequest(socketChannel);
        System.out.println(requestInfo);
        if (requestInfo.contains("GET")) {
            DoGetOperater doGetOperater = new DoGetOperater(socketChannel, requestInfo);
            doGetOperater.doGet();
        }
        if (requestInfo.contains("POST")) {
            DoPostOperater doPostOperater = new DoPostOperater(requestInfo, socketChannel);
            doPostOperater.doPost();
        }
    }
}
package priv.lee.paradise.nioserver.listen;

import java.net.InetSocketAddress;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;

/**
 * Created by li on 2016/12/11.
 * 使用nio建立的端口监听器。本类再也不使用静态方法。
 */
public class PortListener {
    private int port;
    private Selector selector;

    /**
     * 初始化监听器,要建立监听器必须传入端口号
     * param:port 端口号。
     * return:null
     */
    public PortListener(int port) {
        this.port = port;
        init();
    }

    /**
     * 初始化端口监听器,获取通道、注册事件。
     * param:null;
     * return:null
     */
    private void init() {
        ServerSocketChannel serverSocketChannel = getServerSocketChannel();
        selector = getSelector();
        registerEvent(serverSocketChannel, selector, SelectionKey.OP_ACCEPT);

    }

    /**
     * 开始监听事件。
     */
    public void startListening() {
        System.out.println("开始监听" + port + "号端口");
        while (!Thread.interrupted()) {
            handleEvent();
        }
    }

    /**
     * 但监听器监听到注册过的事件到来的时候,
     * selector.select();为线程阻塞方法,当感兴趣的事件到来的时候才会触发这个方法。
     */
    private void handleEvent() {
        try {
            selector.select();
            Iterator iterator = selector.selectedKeys().iterator();
            while (iterator.hasNext()) {
                SelectionKey key = (SelectionKey) iterator.next();
                if (key.isAcceptable()) {
                    handleAcceptableEvent(key);
                } else if (key.isReadable()) {
                    handleReadableEvent(key);
                }
                iterator.remove();
            }
        } catch (Exception e) {
            e.printStackTrace();
        }

    }

    /**
     * 处理一个可读的事件。
     *
     * @param key 可读事件
     */
    private void handleReadableEvent(SelectionKey key) {
        SocketChannel socketChannel = (SocketChannel) key.channel();
        ListenerObserver.acceptNotify(socketChannel);



    }


    /**
     * 处理Acceptable事件
     *
     * @param key 以注册是的事件触发。并向选择器注册读取事件。
     */
    private void handleAcceptableEvent(SelectionKey key) {
        ServerSocketChannel serverSocketChannel;
        try {
            serverSocketChannel = (ServerSocketChannel) key.channel();
            SocketChannel socketChannel = serverSocketChannel.accept();
            socketChannel.configureBlocking(false);
            socketChannel.register(selector, SelectionKey.OP_READ);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    /**
     * 获取一个ServerSocketChannel对象,并对其进行一些配置。
     * param:null。
     * return:ServerSocketChannel
     */

    private ServerSocketChannel getServerSocketChannel() {
        ServerSocketChannel serverSocketChannel = null;
        try {
            serverSocketChannel = ServerSocketChannel.open();
            serverSocketChannel.configureBlocking(false);
            serverSocketChannel.socket().bind(new InetSocketAddress(port));

        } catch (Exception e) {
            e.printStackTrace();

        }
        return serverSocketChannel;
    }

    /**
     * 获取一个选择器。
     * param:null
     * return Selector.
     */

    private Selector getSelector() {
        Selector selector = null;
        try {
            selector = Selector.open();
        } catch (Exception e) {
            e.printStackTrace();
        }
        return selector;
    }

    /**
     * 为通道注册选择器要监听的事件。
     *
     * @param serverSocketChannel 待监测的通道。
     * @param selector            选择器
     * @param opAccept            待监听的事件选项(int)。
     */


    private void registerEvent(ServerSocketChannel serverSocketChannel, Selector selector, int opAccept) {
        try {
            serverSocketChannel.register(selector, opAccept);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}
相关文章
相关标签/搜索