架构设计:系统间通讯(4)——IO通讯模型和JAVA实践 中篇

四、多路复用IO模型

在“上篇”文章中,咱们已经提到了使用多线程解决高并发场景的问题所在,这篇文章咱们开始 java

4-一、现实场景

咱们试想一下这样的现实场景: linux

一个餐厅同时有100位客人到店,固然到店后第一件要作的事情就是点菜。可是问题来了,餐厅老板为了节约人力成本目前只有一位大堂服务员拿着惟一的一本菜单等待客人进行服务。 git

那么最笨(可是最简单)的方法是(方法A),不管有多少客人等待点餐,服务员都把仅有的一份菜单递给其中一位客 人,而后站在客人身旁等待这个客人完成点菜过程。在记录客人点菜内容后,把点菜记录交给后堂厨师。而后是第二位客人。。。。而后是第三位客人。很明显,只 有脑壳被门夹过的老板,才会这样设置服务流程。由于随后的80位客人,再等待超时后就会离店(还会给差评)。 github

因而还有一种办法(方法B),老板立刻新雇佣99名服务员,同时印制99本新的菜单。每一名服务员手持一本菜单 负责一位客人(关键不仅在于服务员,还在于菜单。由于没有菜单客人也没法点菜)。在客人点完菜后,记录点菜内容交给后堂厨师(固然为了更高效,后堂厨师最 好也有100名)。这样每一位客人享受的就是VIP服务咯,固然客人不会走,可是人力成本但是一个大头哦(亏死你)。 apache

另一种办法(方法C),就是改进点菜的方式,当客人到店后,本身申请一本菜单。想好本身要点的才后,就呼叫服 务员。服务员站在本身身边后记录客人的菜单内容。将菜单递给厨师的过程也要进行改进,并非每一份菜单记录好之后,都要交给后堂厨师。服务员能够记录号多 份菜单后,同时交给厨师就好了。那么这种方式,对于老板来讲人力成本是最低的;对于客人来讲,虽然再也不享受VIP服务而且要进行必定的等待,可是这些都是 可接受的;对于服务员来讲,基本上她的时间都没有浪费,基本上被老板压杆了最后一滴油水。 windows

若是您是老板,您会采用哪一种方式呢? 设计模式

  • 到店状况:并发量。到店状况不理想时,一个服务员一本菜单,固然是足够了。因此不一样的老板在不一样的场合下,将会灵活选择服务员和菜单的配置。
  • 客人:客户端请求
  • 点餐内容:客户端发送的实际数据
  • 老板:操做系统
  • 人力成本:系统资源
  • 菜单:文件状态描述符。操做系统对于一个进程可以同时持有的文件状态描述符的个数是有限制的,在linux系统中$ulimit -n查看这个限制值,固然也是能够(而且应该)进行内核参数调整的。
  • 服务员:操做系统内核用于IO操做的线程(内核线程)
  • 厨师:应用程序线程(固然厨房就是应用程序进程咯)
  • 餐单传递方式:包括了阻塞式和非阻塞式两种
  • 方法A:阻塞式/非阻塞式 同步IO
  • 方法B:使用线程进行处理的 阻塞式/非阻塞式 同步IO
  • 方法C:阻塞式/非阻塞式 多路复用IO

4-二、典型的多路复用IO实现

目前流程的多路复用IO实现主要包括四种:select、poll、epoll、kqueue。下表是他们的一些重要特性的比较: 数组

IO模型 相对性能 关键思路 操做系统 JAVA支持状况
select 较高 Reactor windows/Linux 支持,Reactor模式(反应器设计模式)。Linux操做系统的 kernels 2.4内核版本以前,默认使用select;而目前windows下对同步IO的支持,都是select模型
poll 较高 Reactor Linux Linux下的JAVA NIO框架,Linux kernels 2.6内核版本以前使用poll进行支持。也是使用的Reactor模式
epoll Reactor/Proactor Linux Linux kernels 2.6内核版本及之后使用epoll进行支持;Linux kernels 2.6内核版本以前使用poll进行支持;另一定注意,因为Linux下没有Windows下的IOCP技术提供真正的 异步IO 支持,因此Linux下使用epoll模拟异步IO
kqueue Proactor Linux 目前JAVA的版本不支持

多路复用IO技术最适用的是“高并发”场景,所谓高并发是指1毫秒内至少同时有上千个链接请求准备好。其余状况下多路复用IO技术发挥不出来它的优点。另外一方面,使用JAVA NIO进行功能实现,相对于传统的Socket套接字实现要复杂一些,因此实际应用中,须要根据本身的业务需求进行技术选择。 缓存

五、JAVA对多路复用IO的支持

这里写图片描述

5-一、重要概念:Channel

通道,被创建的一个应用程序和操做系统交互事件、传递内容的渠道(注意是链接到操做系统)。一个通道会有一个专属的文件状态描述符。那么既然是和操做系统进行内容的传递,那么说明应用程序能够经过通道读取数据,也能够经过通道向操做系统写数据。 服务器

JDK API中的Channel的描述是:

A channel represents an open connection to an entity such as a hardware device, a file, a network socket, or a program component that is capable of performing one or more distinct I/O operations, for example reading or writing.

A channel is either open or closed. A channel is open upon creation, and once closed it remains closed. Once a channel is closed, any attempt to invoke an I/O operation upon it will cause a ClosedChannelException to be thrown. Whether or not a channel is open may be tested by invoking its isOpen method.

JAVA NIO 框架中,自有的Channel通道包括:

这里写图片描述

  • 全部被Selector(选择器)注册的通道,只能是继承了SelectableChannel类的子类。如上图所示

  • ServerSocketChannel:应用服务器程序的监听通道。只有经过这个通道,应用程序才能向操做系统注册支持“多路复用IO”的端口监听。同时支持UDP协议和TCP协议。

  • ScoketChannel:TCP Socket套接字的监听通道,一个Socket套接字对应了一个客户端IP:端口 到 服务器IP:端口的通讯链接。

  • DatagramChannel:UDP 数据报文的监听通道。

5-二、重要概念:Buffer

数据缓存区:在JAVA NIO 框架中,为了保证每一个通道的数据读写速度JAVA NIO 框架为每一种须要支持数据读写的通道集成了Buffer的支持。

这句话怎么理解呢?例如ServerSocketChannel通道它只支持对OP_ACCEPT事件的监听,因此它是不能直接进行网络数据内容的读写的。因此ServerSocketChannel是没有集成Buffer的。

Buffer有两种工做模式:写模式和读模式。在读模式下,应用程序只能从Buffer中读取数据,不能进行写操做。可是在写模式下,应用程序是能够进行读操做的,这就表示可能会出现脏读的状况。因此一旦您决定要从Buffer中读取数据,必定要将Buffer的状态改成读模式。

以下图:

这里写图片描述

  • position:缓存区目前这在操做的数据块位置
  • limit:缓存区最大能够进行操做的位置。缓存区的读写状态正式由这个属性控制的。
  • capacity:缓存区的最大容量。这个容量是在缓存区建立时进行指定的。因为高并发时通道数量每每会很庞大,因此每个缓存区的容量最好不要过大。

    在下文JAVA NIO框架的代码实例中,咱们将进行Buffer缓存区操做的演示。

5-三、重要概念:Selector

Selector的英文含义是“选择器”,不过根据咱们详细介绍的Selector的岗位职责,您能够把它称之为“轮询代理器”、“事件订阅器”、“channel容器管理机”都行。

  • 事件订阅和Channel管理:
    应用程序将向Selector对象注册须要它关注的Channel,以及具体的某一个Channel会对哪些IO事件感兴趣。Selector中也会维护 一个“已经注册的Channel”的容器。如下代码来自WindowsSelectorImpl实现类中,对已经注册的Channel的管理容器:
// Initial capacity of the poll array private final int INIT_CAP = 8; // Maximum number of sockets for select(). // Should be INIT_CAP times a power of 2 private final static int MAX_SELECTABLE_FDS = 1024; // The list of SelectableChannels serviced by this Selector. Every mod // MAX_SELECTABLE_FDS entry is bogus, to align this array with the poll // array, where the corresponding entry is occupied by the wakeupSocket private SelectionKeyImpl[] channelArray = new SelectionKeyImpl[INIT_CAP];
  • 轮询代理:
    应用层再也不经过阻塞模式或者非阻塞模式直接询问操做系统“事件有没有发生”,而是由Selector代其询问。

  • 实现不一样操做系统的支持:
    以前已经提到过,多路复用IO技术 是须要操做系统进行支持的,其特色就是操做系统能够同时扫描同一个端口上不一样网络链接的时间。因此做为上层的JVM,必需要为 不一样操做系统的多路复用IO实现 编写不一样的代码。一样我使用的测试环境是Windows,它对应的实现类是sun.nio.ch.WindowsSelectorImpl:

这里写图片描述

5-四、JAVA NIO 框架简要设计分析

经过上文的描述,咱们知道了多路复用IO技术是操做系统的内核实现。在不一样的操做系统,甚至同一系列操做系统的版本中所实现的多路复用IO技术都是不同的。那么做为跨平台的JAVA JVM来讲如何适应多种多样的多路复用IO技术实现呢?面向对象的威力就显现出来了:不管使用哪一种实现方式,他们都会有“选择器”、“通道”、“缓存”这几个操做要素,那么能够为不一样的多路复用IO技术建立一个统一的抽象组,而且为不一样的操做系统进行具体的实现。JAVA NIO中对各类多路复用IO的支持,主要的基础是java.nio.channels.spi.SelectorProvider抽象类,其中的几个主要抽象方法包括:

  • public abstract DatagramChannel openDatagramChannel():建立和这个操做系统匹配的UDP 通道实现。

  • public abstract AbstractSelector openSelector():建立和这个操做系统匹配的NIO选择器,就像上文所述,不一样的操做系统,不一样的版本所默认支持的NIO模型是不同的。

  • public abstract ServerSocketChannel openServerSocketChannel():建立和这个NIO模型匹配的服务器端通道。

  • public abstract SocketChannel openSocketChannel():建立和这个NIO模型匹配的TCP Socket套接字通道(用来反映客户端的TCP链接)

因为JAVA NIO框架的整个设计是很大的,因此咱们只能还原一部分咱们关心的问题。这里咱们以JAVA NIO框架中对于不一样多路复用IO技术的选择器 进行实例化建立的方式做为例子,以点窥豹观全局:

这里写图片描述

很明显,不一样的SelectorProvider实现对应了不一样的 选择器。由具体的SelectorProvider实现进行建立。另外说明一下,实际上netty底层也是经过这个设计得到具体使用的NIO模型,咱们后 文讲解Netty时,会讲到这个问题。如下代码是Netty 4.0中NioServerSocketChannel进行实例化时的核心代码片断:

private static ServerSocketChannel newSocket(SelectorProvider provider) { try { /** * Use the {@link SelectorProvider} to open {@link SocketChannel} and so remove condition in * {@link SelectorProvider#provider()} which is called by each ServerSocketChannel.open() otherwise. * * See <a href="See https://github.com/netty/netty/issues/2308">#2308</a>. */ return provider.openServerSocketChannel();
        } catch (IOException e) { throw new ChannelException( "Failed to open a server socket.", e);
        }
    }

5-五、JAVA实例

下面,咱们使用JAVA NIO框架,实现一个支持多路复用IO的服务器端(实际上客户端是否使用多路复用IO技术,对整个系统架构的性能提高相关性不大):

package testNSocket; import java.net.InetSocketAddress; import java.net.ServerSocket; import java.net.URLDecoder; import java.net.URLEncoder; import java.nio.ByteBuffer; import java.nio.channels.SelectableChannel; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.util.Iterator; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.log4j.BasicConfigurator; public class SocketServer1 { static {
        BasicConfigurator.configure();
    } /** * 日志 */ private static final Log LOGGER = LogFactory.getLog(SocketServer1.class); public static void main(String[] args) throws Exception {
        ServerSocketChannel serverChannel = ServerSocketChannel.open();
        serverChannel.configureBlocking(false);
        ServerSocket serverSocket = serverChannel.socket();
        serverSocket.setReuseAddress(true);
        serverSocket.bind(new InetSocketAddress(83));

        Selector selector = Selector.open(); //注意、服务器通道只能注册SelectionKey.OP_ACCEPT事件 serverChannel.register(selector, SelectionKey.OP_ACCEPT); try { while(true) { //若是条件成立,说明本次询问selector,并无获取到任何准备好的、感兴趣的事件 //java程序对多路复用IO的支持也包括了阻塞模式 和非阻塞模式两种。 if(selector.select(100) == 0) { //================================================ // 这里视业务状况,能够作一些然并卵的事情 //================================================ continue;
                } //这里就是本次询问操做系统,所获取到的“所关心的事件”的事件类型(每个通道都是独立的) Iterator<SelectionKey> selecionKeys = selector.selectedKeys().iterator(); while(selecionKeys.hasNext()) {
                    SelectionKey readyKey = selecionKeys.next(); //这个已经处理的readyKey必定要移除。若是不移除,就会一直存在在selector.selectedKeys集合中 //待到下一次selector.select() > 0时,这个readyKey又会被处理一次 selecionKeys.remove();

                    SelectableChannel selectableChannel = readyKey.channel(); if(readyKey.isValid() && readyKey.isAcceptable()) {
                        SocketServer1.LOGGER.info("======channel通道已经准备好======="); /* * 当server socket channel通道已经准备好,就能够从server socket channel中获取socketchannel了 * 拿到socket channel后,要作的事情就是立刻到selector注册这个socket channel感兴趣的事情。 * 不然没法监听到这个socket channel到达的数据 * */ ServerSocketChannel serverSocketChannel = (ServerSocketChannel)selectableChannel;
                        SocketChannel socketChannel = serverSocketChannel.accept();
                        registerSocketChannel(socketChannel , selector);

                    } else if(readyKey.isValid() && readyKey.isConnectable()) {
                        SocketServer1.LOGGER.info("======socket channel 创建链接=======");
                    } else if(readyKey.isValid() && readyKey.isReadable()) {
                        SocketServer1.LOGGER.info("======socket channel 数据准备完成,能够去读==读取=======");
                        readSocketChannel(readyKey);
                    }
                }
            }
        } catch(Exception e) {
            SocketServer1.LOGGER.error(e.getMessage() , e);
        } finally {
            serverSocket.close();
        }
    } /** * 在server socket channel接收到/准备好 一个新的 TCP链接后。 * 就会向程序返回一个新的socketChannel。<br> * 可是这个新的socket channel并无在selector“选择器/代理器”中注册, * 因此程序还无法经过selector通知这个socket channel的事件。 * 因而咱们拿到新的socket channel后,要作的第一个事情就是到selector“选择器/代理器”中注册这个 * socket channel感兴趣的事件 * @param socketChannel 新的socket channel * @param selector selector“选择器/代理器” * @throws Exception */ private static void registerSocketChannel(SocketChannel socketChannel , Selector selector) throws Exception {
        socketChannel.configureBlocking(false); //socket通道能够且只能够注册三种事件SelectionKey.OP_READ | SelectionKey.OP_WRITE | SelectionKey.OP_CONNECT socketChannel.register(selector, SelectionKey.OP_READ , ByteBuffer.allocate(2048));
    } /** * 这个方法用于读取从客户端传来的信息。 * 而且观察从客户端过来的socket channel在通过屡次传输后,是否完成传输。 * 若是传输完成,则返回一个true的标记。 * @param socketChannel * @throws Exception */ private static void readSocketChannel(SelectionKey readyKey) throws Exception {
        SocketChannel clientSocketChannel = (SocketChannel)readyKey.channel(); //获取客户端使用的端口 InetSocketAddress sourceSocketAddress = (InetSocketAddress)clientSocketChannel.getRemoteAddress();
        Integer resoucePort = sourceSocketAddress.getPort(); //拿到这个socket channel使用的缓存区,准备读取数据 //在后文,将详细讲解缓存区的用法概念,实际上重要的就是三个元素capacity,position和limit。 ByteBuffer contextBytes = (ByteBuffer)readyKey.attachment(); //将通道的数据写入到缓存区,注意是写入到缓存区。 //因为以前设置了ByteBuffer的大小为2048 byte,因此能够存在写入不完的状况 //不要紧,咱们后面来调整代码。这里咱们暂时理解为一次接受能够完成 int realLen = -1; try {
            realLen = clientSocketChannel.read(contextBytes);
        } catch(Exception e) { //这里抛出了异常,通常就是客户端由于某种缘由终止了。因此关闭channel就好了 SocketServer1.LOGGER.error(e.getMessage());
            clientSocketChannel.close(); return;
        } //若是缓存区中没有任何数据(但实际上这个不太可能,不然就不会触发OP_READ事件了) if(realLen == -1) {
            SocketServer1.LOGGER.warn("====缓存区没有数据?===="); return;
        } //将缓存区从写状态切换为读状态(实际上这个方法是读写模式互切换)。 //这是java nio框架中的这个socket channel的写请求将所有等待。 contextBytes.flip(); //注意中文乱码的问题,我我的喜爱是使用URLDecoder/URLEncoder,进行解编码。 //固然java nio框架自己也提供编解码方式,看我的咯 byte[] messageBytes = contextBytes.array();
        String messageEncode = new String(messageBytes , "UTF-8");
        String message = URLDecoder.decode(messageEncode, "UTF-8"); //若是收到了“over”关键字,才会清空buffer,并回发数据; //不然不清空缓存,还要还原buffer的“写状态” if(message.indexOf("over") != -1) { //清空已经读取的缓存,并重新切换为写状态(这里要注意clear()和capacity()两个方法的区别) contextBytes.clear();
            SocketServer1.LOGGER.info("端口:" + resoucePort + "客户端发来的信息======message : " + message); //====================================================== // 固然接受完成后,能够在这里正式处理业务了  //====================================================== //回发数据,并关闭channel ByteBuffer sendBuffer = ByteBuffer.wrap(URLEncoder.encode("回发处理结果", "UTF-8").getBytes());
            clientSocketChannel.write(sendBuffer);
            clientSocketChannel.close();
        } else {
            SocketServer1.LOGGER.info("端口:" + resoucePort + "客户端信息还未接受完,继续接受======message : " + message); //这是,limit和capacity的值一致,position的位置是realLen的位置 contextBytes.position(realLen);
            contextBytes.limit(contextBytes.capacity());
        }
    }
}

代码中的注释是比较清楚的,可是仍是要对几个关键点进行一下讲解:

  • serverChannel.register(Selector sel, int ops, Object att):实际上register(Selector sel, int ops, Object att)方法是ServerSocketChannel类的父类AbstractSelectableChannel提供的一个方法,表示只要继承了 AbstractSelectableChannel类的子类均可以注册到选择器中。经过观察整个AbstractSelectableChannel继 承关系,下图中的这些类能够被注册到选择器中:

这里写图片描述

  • SelectionKey.OP_ACCEPT:不一样的Channel对象能够注册的“我关心的事件”是不同的。例如 ServerSocketChannel除了可以被容许关注OP_ACCEPT时间外,不容许再关心其余事件了(不然运行时会抛出异常)。如下梳理了常使 用的AbstractSelectableChannel子类能够注册的事件列表:
通道类 通道做用 可关注的事件
ServerSocketChannel 服务器端通道 SelectionKey.OP_ACCEPT
DatagramChannel UDP协议通道 SelectionKey.OP_READ、SelectionKey.OP_WRITE
SocketChannel TCP协议通道 SelectionKey.OP_READ、SelectionKey.OP_WRITE、SelectionKey.OP_CONNECT

实际上经过每个AbstractSelectableChannel子类所实现的public final int validOps()方法,就能够查看这个通道“能够关心的IO事件”。

  • selector.selectedKeys().iterator():当选择器Selector收到操做系统的IO操做事件后,它的 selectedKeys将在下一次轮询操做中,收到这些事件的关键描述字(不一样的channel,就算关键字同样,也会存储成两个对象)。可是每个“事件关键字”被处理后都必须移除,不然下一次轮询时,这个事件会被重复处理

Returns this selector’s selected-key set.

Keys may be removed from, but not directly added to, the selected-key set. Any attempt to add an object to the key set will cause an UnsupportedOperationException to be thrown.

The selected-key set is not thread-safe.

5-六、JAVA实例改进

上面的代码中,咱们为了讲解selector的使用,在缓存使用上就进行了简化。实际的应用中,为了节约内存资源,咱们通常不会为一个通道分配那么多的缓存空间。下面的代码咱们主要对其中的缓存操做进行了优化:

package testNSocket; import java.net.InetSocketAddress; import java.net.ServerSocket; import java.net.URLDecoder; import java.net.URLEncoder; import java.nio.ByteBuffer; import java.nio.channels.SelectableChannel; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.util.Iterator; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.log4j.BasicConfigurator; public class SocketServer2 { static {
        BasicConfigurator.configure();
    } /** * 日志 */ private static final Log LOGGER = LogFactory.getLog(SocketServer2.class); /** * 改进的java nio server的代码中,因为buffer的大小设置的比较小。 * 咱们再也不把一个client经过socket channel屡次传给服务器的信息保存在beff中了(由于根本存不下)<br> * 咱们使用socketchanel的hashcode做为key(固然您也能够本身肯定一个id),信息的stringbuffer做为value,存储到服务器端的一个内存区域MESSAGEHASHCONTEXT。 * * 若是您不清楚ConcurrentHashMap的做用和工做原理,请自行百度/Google */ private static final ConcurrentMap<Integer, StringBuffer> MESSAGEHASHCONTEXT = new ConcurrentHashMap<Integer , StringBuffer>(); public static void main(String[] args) throws Exception {
        ServerSocketChannel serverChannel = ServerSocketChannel.open();
        serverChannel.configureBlocking(false);
        ServerSocket serverSocket = serverChannel.socket();
        serverSocket.setReuseAddress(true);
        serverSocket.bind(new InetSocketAddress(83));

        Selector selector = Selector.open(); //注意、服务器通道只能注册SelectionKey.OP_ACCEPT事件 serverChannel.register(selector, SelectionKey.OP_ACCEPT); try { while(true) { //若是条件成立,说明本次询问selector,并无获取到任何准备好的、感兴趣的事件 //java程序对多路复用IO的支持也包括了阻塞模式 和非阻塞模式两种。 if(selector.select(100) == 0) { //================================================ // 这里视业务状况,能够作一些然并卵的事情 //================================================ continue;
                } //这里就是本次询问操做系统,所获取到的“所关心的事件”的事件类型(每个通道都是独立的) Iterator<SelectionKey> selecionKeys = selector.selectedKeys().iterator(); while(selecionKeys.hasNext()) {
                    SelectionKey readyKey = selecionKeys.next(); //这个已经处理的readyKey必定要移除。若是不移除,就会一直存在在selector.selectedKeys集合中 //待到下一次selector.select() > 0时,这个readyKey又会被处理一次 selecionKeys.remove();

                    SelectableChannel selectableChannel = readyKey.channel(); if(readyKey.isValid() && readyKey.isAcceptable()) {
                        SocketServer2.LOGGER.info("======channel通道已经准备好======="); /* * 当server socket channel通道已经准备好,就能够从server socket channel中获取socketchannel了 * 拿到socket channel后,要作的事情就是立刻到selector注册这个socket channel感兴趣的事情。 * 不然没法监听到这个socket channel到达的数据 * */ ServerSocketChannel serverSocketChannel = (ServerSocketChannel)selectableChannel;
                        SocketChannel socketChannel = serverSocketChannel.accept();
                        registerSocketChannel(socketChannel , selector);

                    } else if(readyKey.isValid() && readyKey.isConnectable()) {
                        SocketServer2.LOGGER.info("======socket channel 创建链接=======");
                    } else if(readyKey.isValid() && readyKey.isReadable()) {
                        SocketServer2.LOGGER.info("======socket channel 数据准备完成,能够去读==读取=======");
                        readSocketChannel(readyKey);
                    }
                }
            }
        } catch(Exception e) {
            SocketServer2.LOGGER.error(e.getMessage() , e);
        } finally {
            serverSocket.close();
        }
    } /** * 在server socket channel接收到/准备好 一个新的 TCP链接后。 * 就会向程序返回一个新的socketChannel。<br> * 可是这个新的socket channel并无在selector“选择器/代理器”中注册, * 因此程序还无法经过selector通知这个socket channel的事件。 * 因而咱们拿到新的socket channel后,要作的第一个事情就是到selector“选择器/代理器”中注册这个 * socket channel感兴趣的事件 * @param socketChannel 新的socket channel * @param selector selector“选择器/代理器” * @throws Exception */ private static void registerSocketChannel(SocketChannel socketChannel , Selector selector) throws Exception {
        socketChannel.configureBlocking(false); //socket通道能够且只能够注册三种事件SelectionKey.OP_READ | SelectionKey.OP_WRITE | SelectionKey.OP_CONNECT //最后一个参数视为 为这个socketchanne分配的缓存区 socketChannel.register(selector, SelectionKey.OP_READ , ByteBuffer.allocate(50));
    } /** * 这个方法用于读取从客户端传来的信息。 * 而且观察从客户端过来的socket channel在通过屡次传输后,是否完成传输。 * 若是传输完成,则返回一个true的标记。 * @param socketChannel * @throws Exception */ private static void readSocketChannel(SelectionKey readyKey) throws Exception {
        SocketChannel clientSocketChannel = (SocketChannel)readyKey.channel(); //获取客户端使用的端口 InetSocketAddress sourceSocketAddress = (InetSocketAddress)clientSocketChannel.getRemoteAddress();
        Integer resoucePort = sourceSocketAddress.getPort(); //拿到这个socket channel使用的缓存区,准备读取数据 //在后文,将详细讲解缓存区的用法概念,实际上重要的就是三个元素capacity,position和limit。 ByteBuffer contextBytes = (ByteBuffer)readyKey.attachment(); //将通道的数据写入到缓存区,注意是写入到缓存区。 //此次,为了演示buff的使用方式,咱们故意缩小了buff的容量大小到50byte, //以便演示channel对buff的屡次读写操做 int realLen = 0;
        StringBuffer message = new StringBuffer(); //这句话的意思是,将目前通道中的数据写入到缓存区 //最大可写入的数据量就是buff的容量 while((realLen = clientSocketChannel.read(contextBytes)) != 0) { //必定要把buffer切换成“读”模式,不然因为limit = capacity //在read没有写满的状况下,就会致使多读 contextBytes.flip(); int position = contextBytes.position(); int capacity = contextBytes.capacity(); byte[] messageBytes = new byte[capacity];
            contextBytes.get(messageBytes, position, realLen); //这种方式也是能够读取数据的,并且不用关心position的位置。 //由于是目前contextBytes全部的数据所有转出为一个byte数组。 //使用这种方式时,必定要本身控制好读取的最终位置(realLen很重要) //byte[] messageBytes = contextBytes.array(); //注意中文乱码的问题,我我的喜爱是使用URLDecoder/URLEncoder,进行解编码。 //固然java nio框架自己也提供编解码方式,看我的咯 String messageEncode = new String(messageBytes , 0 , realLen , "UTF-8");
            message.append(messageEncode); //再切换成“写”模式,直接状况缓存的方式,最快捷 contextBytes.clear();
        } //若是发现本次接收的信息中有over关键字,说明信息接收完了 if(URLDecoder.decode(message.toString(), "UTF-8").indexOf("over") != -1) { //则从messageHashContext中,取出以前已经收到的信息,组合成完整的信息 Integer channelUUID = clientSocketChannel.hashCode();
            SocketServer2.LOGGER.info("端口:" + resoucePort + "客户端发来的信息======message : " + message);
            StringBuffer completeMessage; //清空MESSAGEHASHCONTEXT中的历史记录 StringBuffer historyMessage = MESSAGEHASHCONTEXT.remove(channelUUID); if(historyMessage == null) {
                completeMessage = message;
            } else {
                completeMessage = historyMessage.append(message);
            }
            SocketServer2.LOGGER.info("端口:" + resoucePort + "客户端发来的完整信息======completeMessage : " + URLDecoder.decode(completeMessage.toString(), "UTF-8")); //====================================================== // 固然接受完成后,能够在这里正式处理业务了  //====================================================== //回发数据,并关闭channel ByteBuffer sendBuffer = ByteBuffer.wrap(URLEncoder.encode("回发处理结果", "UTF-8").getBytes());
            clientSocketChannel.write(sendBuffer);
            clientSocketChannel.close();
        } else { //若是没有发现有“over”关键字,说明尚未接受完,则将本次接受到的信息存入messageHashContext SocketServer2.LOGGER.info("端口:" + resoucePort + "客户端信息还未接受完,继续接受======message : " + URLDecoder.decode(message.toString(), "UTF-8")); //每个channel对象都是独立的,因此可使用对象的hash值,做为惟一标示 Integer channelUUID = clientSocketChannel.hashCode(); //而后获取这个channel下之前已经达到的message信息 StringBuffer historyMessage = MESSAGEHASHCONTEXT.get(channelUUID); if(historyMessage == null) {
                historyMessage = new StringBuffer();
                MESSAGEHASHCONTEXT.put(channelUUID, historyMessage.append(message));
            }
        }
    }
}

以上代码应该没有过多须要讲解的了。固然,您仍是能够加入线程池技术,进行具体的业务处理。注意,必定是线程池,由于这样能够保证线程规模的可控性。

六、多路复用IO的优缺点

  • 不用再使用多线程来进行IO处理了(包括操做系统内核IO管理模块和应用程序进程而言)。固然实际业务的处理中,应用程序进程仍是能够引入线程池技术的

  • 同一个端口能够处理多种协议,例如,使用ServerSocketChannel测测的服务器端口监听,既能够处理TCP协议又能够处理UDP协议。

  • 操做系统级别的优化:多路复用IO技术能够是操做系统级别在一个端口上可以同时接受多个客户端的IO事件。同时具备以前咱们讲到的阻塞式同步IO和非阻塞式同步IO的全部特色。Selector的一部分做用更至关于“轮询代理器”。

  • 都是同步IO:目前咱们介绍的 阻塞式IO、非阻塞式IO甚至包括多路复用IO,这些都是基于操做系统级别对“同步IO”的实现。咱们一直在说“同步IO”,一直都没有详细说,什么叫作“同步IO”。实际上一句话就能够说清楚:只有上层(包括上层的某种代理机制)系统询问我是否有某个事件发生了,不然我不会主动告诉上层系统事件发生了

这个关键概念,在这篇文章以前的几张“原理说明图”中实际上就能够清晰的提现了,可是为了让你们更清楚的总结同步IO、异步IO、阻塞IO、非阻塞IO的概念,下篇文章在讲解异步IO后我会梳理了一张对比表格。

七、异步IO(真正的NIO)

好吧,很差意思,我再一次错估了文章的工做量。JAVA Asynchronous IO的介绍咱们只有再日后推一推了。下一篇文章中,我将详细讲解操做系统支持的异步IO方式,并介绍JAVA 1.7版本中加入的NIO2.0(AIO)对异步IO的实现。上文也说过了,在Linux系统中并无Windows中的IOCP技术,因此Linux技 术使用epoll多路复用技术模拟异步IO。

相关文章
相关标签/搜索