基于NIO的消息路由的实现(一) 前言

1、前言:java

已经好久没有碰编码了,大概有9年的时间,突飞猛进的框架和新东西让我眼花缭乱。以前一直在作web相关的应用。因为项目不大,分布式开发在我编码的那个年代里没有作过,后来走上管理岗位才接触到,仅限于沟通交流和方案的策划,并无真正的作过。现在我有了一点时间和精力,决定本身学习一下,先从简单的消息通信开始吧。
git

好,背景完毕!下面说说我想作的东西,我想作一个基于NIO的消息路由,而并不基于目前已有的各类优秀框架(mina,netty等等),这么作的初衷也许跟我我的的习惯有关,我老是以为若是不明白原理,即便再好的框架当遭遇问题的时候,我也会无从下手,若是我懂得了原理,再选用其余的框架,也会更驾轻就熟。因此才没有使用现今那些优秀的框架,或许是个人一点点偏见吧。web

个人代码已经发布在 http://git.oschina.net/java616网络

目已经完成根据客户端的标识进行消息的异步转发,仍会持续的迭代和增长。有兴趣的能够下载回去,若是我有作的很差或者不对的地方,敬请指出。框架

2、一些概念和例程异步

NIO是啥我就不说了,咱们来看一下我理解的NIO工做流程,如图:socket

上图为我所理解的NIO的工做过程,若是存在问题,请批评斧正。归纳一下个人理解:分布式

  • SocketChannel:为NIO工做过程当中,数据传输的通道,客户端与服务端的每次交互都是经过此通道进行的;学习

  • Selector(多路复用器):会监控其注册的通道上面的任何事件,得到SelectionKey,事件分为OP_ACCEPT,OP_CONNECT,OP_WRITE,OP_READ(这是SelectionKey的四个属性),OP_ACCEPT应该为服务端接收到客户端链接时的一种状态,我在客户端并无用到此状态;OP_CONNECT则为客户端已经链接上服务端的一种状态,我在服务端并无使用这个状态;测试

  • Buffer:个人应用中,我一直使用ByteBuffer,此类是整个NIO通信的关键,必须理解才能进行通信的开发,不然可能产生问题;全部的通信内容都须要在此类中写入和读出;


若是想作nio相关的应用,那么一些概念上的东西是不可回避的,在这里推荐:http://www.iteye.com/magazines/132-Java-NIO 。

下面三段代码,分别完成了服务的建立、服务对事件的监听以及客户端对事件的监听(不可直接拷贝使用,有一些变量没有声明,若有兴趣,能够去下载个人源码)。

  • 服务的建立

//打开一个serversocket通道,ServerSocketChannel是一个监控是否有新链接进入的通道。
serverSocketChannel = ServerSocketChannel.open();
//将这个serversokect通道设置为非阻塞模式
serverSocketChannel.configureBlocking(false);
//绑定serversokect的ip和端口
serverSocketChannel.socket().bind(new InetSocketAddress(cfg.getIp(), cfg.getPort()));
//打开选择器
selector = Selector.open();
//将此通道注册给选择器selector
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
  • 服务对事件的监听

                //监听事件key
                selector.select(2000);
                //迭代一组事件key
                Iterator<SelectionKey> keys = selector.selectedKeys().iterator();
                while (keys.hasNext()) {
                    //定义一个socket通道
                    SocketChannel socketChannel = null;

                    int count = 0;

                    SelectionKey key = keys.next();
                    //  Logs.info("有网络事件被触发,事件类型为:" + key.interestOps());
                    //删除Iterator中的当前key,避免重复处理
                    keys.remove();
                    if (!key.isValid()) {
                        continue;
                    } else if (key.isAcceptable()) {
                        //从客户端送来的key中获取ServerSocket通道
                        serverSocketChannel = (ServerSocketChannel) key.channel();
                        //接收此ServerSocket通道中的Socket通道,accept是一个阻塞方法,一直到获取到链接才会继续
                        socketChannel = serverSocketChannel.accept();
                        //将此socket通道设置为非阻塞模式
                        socketChannel.configureBlocking(false);
                        //将此通道注册到selector,并等待接收客户端的读入数据
                        socketChannel.register(selector, SelectionKey.OP_READ);
                        allocToken(socketChannel);

                    } else if (key.isReadable()) {

                        //获取事件key中的channel
                        socketChannel = (SocketChannel) key.channel();
                        ByteBuffer byteBuffer = ByteBuffer.allocate(Config.getReadBlock());
                        //清理缓冲区,便于使用
                        byteBuffer.clear();
                        //将channel中的字节流读入缓冲区
                        count = socketChannel.read(byteBuffer);
                        byteBuffer.flip();
                        //处理粘包
                        if (count > 0) {
                            try {
                                handlePacket(socketChannel, byteBuffer);
                            } catch (Exception e) {
                                e.printStackTrace();
//                                continue;//若是当前包存在非法抛出异常,那么再也不进行处理直接跳出循环,处理下一个包;此处存疑,测试阶段暂时注释
                            }
                        } else if (count == 0) {
                            continue;
                        } else {
                            socketChannel.close();

                        }

                    } else if (key.isWritable()) {
                        ((SocketChannel) key.channel()).register(selector, SelectionKey.OP_READ);
                    }
                }


  • 客户端对事件的监听

            while (true) {
                try {

                    selector.select(3000);

                    Iterator<SelectionKey> keys = selector.selectedKeys().iterator();
                    for (int i = 0; keys.hasNext(); i++) {

                        SelectionKey key = keys.next();
                        keys.remove();
                        if (key.isConnectable()) {
                            socketChannel = (SocketChannel) key.channel();
                            if (socketChannel.isConnectionPending()) {
                                if (socketChannel.finishConnect()){
                                    Client.IS_CONNECT =true;
                                    logger.info("-------成功链接服务端!-------");
                                }

                            }
                            socketChannel.register(selector, SelectionKey.OP_READ);
                        } else if (key.isReadable()) {
                            //获取事件key中的channel
                            socketChannel = (SocketChannel) key.channel();
                            ByteBuffer byteBuffer = ByteBuffer.allocate(BLOCK);
                            //清理缓冲区,便于使用
                            byteBuffer.clear();
                            //将channel中的字节流读入缓冲区
                            String readStr = "";
                            int count = socketChannel.read(byteBuffer);
                            //务必要把buffer的position重置为0
                            byteBuffer.flip();

                            handlePacket(byteBuffer, count);
//                            socketChannel.register(selector, SelectionKey.OP_READ);
                        } else if (key.isWritable()) {
                            socketChannel.register(selector, SelectionKey.OP_READ);
                        }

                    }
                } catch (IOException e) {
                    e.printStackTrace();
                    continue;
                }

            }

3、我要作的是个啥?

根据我我的对NIO的理解,个人初步想法是要实现一个这样的东西,如图:

但在个人不断深刻开发中,发现上面的图中不少不成熟的内容,做为一个完整的消息通信的服务,必须包含以下的内容:

一、对接入链接的管理;

二、对链接身份的确认;

三、对异常关闭链接的回收;

四、根据身份对消息的转发;

五、链路的维持;

六、自动重连;

七、消息的异步处理;

八、消息的响应机制;

九、粘包和断包的处理;

九、配置体系;

十、通信层与业务层的分离;

………………

网上不少的NIO实例都是能够运行的,但并不能知足个人工做须要,以上的那些确定还有没有考虑全的东西,随着我一点点的开发会逐渐的浮出水面。

在将来的文章中,我会逐步把我本身制定的通信协议,各个模块的结构,以及代码贴出来,但愿你们可以互相学习,互相帮助。(待续)

相关文章
相关标签/搜索