一篇文章让你成为 NIO 大师 - MyCAT通讯模型

这篇文章没有详细介绍 NIO 的概念,对于 NIO 不了解的同窗,可根据本身须要,阅读这篇介绍 NIO 的博客 
 
io.mycat.net.NIOAcceptor
NIOAcceptor负责处理客户端(指链接MyCAT以访问数据库的程序)的链接请求。
NIOAcceptor中持有一个Selector字段selector,和一个ServerSocketChannel字段serverChannel。并向selector中注册serverChannel
,并注册感兴趣的事件为 OP_ACCEPT。
NIOAcceptor继承了Thread接口,在run()方法内,生成了一个selector的拷贝tSeletor,并由tSeletor调用select()方法轮询事件是否就绪。
当检测到前段一个链接请求过来时,会调用serverChannel的accept()方法建立一个新的通道,并从工厂获取一个新的前端链接实例(MyCAT和客户端的链接称为前端链接,MyCAT和MySQL的链接称为后端链接),将通道和链接实例绑定。该链接实例由io.mycat.net.NIOProcessor负责管理,NIOProcessor在MyCAT中也是以NIOProcessor池的形式存在的,acceptor从池中拿出一个Processor实例,并将其和链接实例绑定。随后,NIOAcceptor将该链接转交给io.mycat.net.NIOReactor,
NIOAcceptor的工做就结束了。
 
io.mycat.net.NIOReactor
NIOReactor中声明了一个final的内部类 RW,RW继承了Runnable。RW中持有一个Selector字段selector,和一个用来保存AbstractConnection的ConcurrentLinkedQueue队列registerQueue。在RW的run()方法中,调用selector的select()方法(该selector也是由RW的字段selector的拷贝而来,之后如非特别说明,皆是如此),轮询 I/O 事件。
NIOReactor在接到NIOAcceptor发来的前端链接实例后,会将其添加到registerQueue的队尾,并调用selector的wakeup()方法使selector马上返回。阻塞在select()方法以后的是RW的register方法,调用register(这个方法设计的别出心裁,以后会贴出代码),RW会从registerQueue中取出队首的链接实例c,从c中获取NIOSocketWR实例,调用NIOSocketWR的register方法,该方法从链接实例中获取通道,向selector中注册该通道,注册感兴趣的事件为OP_READ。随后,调用链接实例c的register()方法,生成认证数据,发送握手数据包,创建TCP链接。链接创建后,调用NIOSocketWR的asynRead()异步读方法(为何这么作,在贴出代码中会讲述)。
前面讲到RW的run()方法会循环调用selector的select()方法,除了当有新链接加入时,会直接返回以外,selector只有当检测到有注册的 OP_READ 事件就绪时才会返回。当有通道的读事件就绪时,RW会判断该事件是一个“读”事件仍是一个“写”事件,并调用链接实例的相应方法处理该读/写事件。为何还要判断?由于从名字就能够得知,这是一个读写复用的处理类,虽然当前咱们处理的是读事件。
asynRead()异步读方法:获取缓冲区,调用通道的read方法,将数据读到缓冲区。随后,调用链接实例的处理方法处理缓冲区的字节流信息。
 
代码(注释是我加的)
/**
 * 当一个链接由 Acceptor 转发过来时,会使得 selector 立刻返回,调用该方法,
 * 将该链接及其通道注册到 selector 中,  注册 OP_READ 事件。
 * 随后,创建 TCP 链接,创建 TCP 链接的时候也会调用异步读取数据。
 * 若此时刚好有数据到达,则可直接读取。
 * 若此时没有数据到达,则继续执行 select() 轮询 I/O 事件。
 *
 * 当 selector 轮询到有 I/O 事件就绪,而返回时,
 * 如此时 registerQueue 是必定为空的。
 */
private void register(Selector selector) {
   AbstractConnection c = null;
   if (registerQueue.isEmpty()) {
      return;
   }
   while ((c = registerQueue.poll()) != null) {
      try {
         ((NIOSocketWR) c.getSocketWR()).register(selector);
         c.register();
      } catch (Exception e) {
         c.close("register err" + e.toString());
      }
   }
}

 

 
io.mycat.net.AbstractConnection
前面反复提到一个词:链接实例,究竟什么是链接实例?客户端发往MyCAT的每一次请求,以及MyCAT发往MySQL的每一次请求,都是一个链接实例。能够把链接实例看做是一次请求事件的主干,咱们都知道,NIO是一个同步非阻塞的 I/O 模型。阻塞的线程没有作任何有意义的事情,却依然消耗系统资源,这是咱们不能接受的,所谓非阻塞,就是不断的在这条主干上衍生分支,来处理复杂的业务请求,这样主干就不会阻塞。而同步,是指 线程不断轮询 IO 事件是否就绪,主干上衍生的这些分支,都维护了一个Selector对象,Selector代替了主干线程来执行这种轮询,包括前面讲到的acceptor和reactor;这些分支线程是以线程池的形式存在的,是能够复用的,从而减小了频繁建立、启动、挂起、析构新线程的开销,大大提高系统的并发效率。
 
io.mycat.net.NIOHandler
前面讲到了,当数据读到缓冲区后,调用链接实例的处理方法处理缓冲区的字节流。那么,这里是如何处理的呢?事实上,链接实例会先从将数据流从缓冲区读出来,回想一下,这是一个MySQL的中间件,全部的数据都是SQL语句。因此,接下来就是对字节流形式的SQL解包。不要忘了计算机网络的知识,端与端之间的通讯是要按照某种协议的,这就是包头。因此,接下来的工做就是对包头进行解压,分析。通过这一步,MyCAT已经知道了客户端发来的SQL语句是什么类型的语句(登录、增删改查等)。而后,就会调用 NIOHandler 来处理这一条 SQL 语句。
NIOHandler和客户端相关的实体类有:FrontendAuthenticator、FrontendCommandHandler。从名字就能够看出,一个是负责权限验证的,一个是负责处理命令行的。
在FrontendCommandHandler,会根据解析过的包头,根据不一样的SQL语句类型,调用链接实例的相应方法。
 
接下来的事情
接下来就是,在链接实例的细分方法中,将字节流形式的信息转成字符串;而后就是SQL语法分析,生成语法树;展开语法树,计算路由节点;再接下来就是将SQL发给MySQL服务器;而后合并结果集,返回客户端。
我不许备讲语法解析和路由的部分,这不是咱们的重点。因此,如今咱们假设已经作完了这两步,接下来要作的就是将SQL发给MySQL真正执行。
 
NonBlockingSession、SingleNodeHandler/MultiNodeHandler
这个时候,前端链接实例会调用本身NonBlockingSession类型的session字段的execute方法,execute也只作了一件事情,就是根据返回的RouteResultset是单节点的仍是多节点的,决定是调用SingleNodeHandler仍是MultiNodeHandler。session中维护了与每一个节点的后端链接,在nodeHandler中,会从session中取得须要的后端链接,而后只作了一件事,就是将本身设置为后端链接实例的回调。随后,真正的执行就交给了后端链接。
之因此要通过设置回调这一步,是由于nodeHandler会负责解析由MySQL发回的消息。
 
MySQLConnection
MySQLConnection的execute方法中,将通过解析的SQL语句从新封装成消息包,并将该消息包加入到写队列中。前面出现过一个NIOSocketWR的类,由于前端的链接是NIO的,而MyCAT与MySQL的链接是由AIO实现的,所以,MySQLConnection会将把写队列的缓冲区写到Channel的任务交给了AIOSocketWR,AIOSocketWR负责维护一个AsynchronousSocketChannel类型的channel对象,经过调用AsynchronousSocketChannel的:
write(ByteBuffer src, A attachment, CompletionHandler<Integer,? super A> handler)
 
其中,传入的attachment是AIOSocketWR自己,并实现了一个CompletionHandler类AIOWriteHandler,须要重写两个方法,分别是completed和failed,当写入完成后会回调相应的方法。
 
到这个时候,将客户端的命令发给MySQL的工做就作完了,接下来的就是等待MySQL返回结果了。
 
如何知道MySQL是否返回结果了?也是在NIOReactor(mycat对nio和aio关系处理的有点乱),nioreactor轮询到有消息过来的时候,就会教给链接实例去执行异步读方法,这个方法中又调用了socketWR的异步读。注意这个时候链接实例已是后端链接了,因此它会调用AIOSocketWR。异步读和异步写相同,都是使用了Java NIO包封装的类AsynchronousSocketChannel,调用:
read(ByteBuffer src, A attachment, CompletionHandler<Integer,? super A> handler)
 
在completed中,会调用connection的处理方法,而connection,则会交给以前注册过的回调handler,就是SingleNodeHandler或MultiNodeHandler。
由于MySQL返回的包是一行一行的,所以会屡次调用异步读方法。
 
SingleNodeHandler和MultiNodeHandler都是继承了ResponseHandler,经过观看源码,能够更加轻松地理解这种回调是如何进行的。
 
public interface ResponseHandler {

/** * 没法获取链接 * * @param e * @param conn */ public void connectionError(Throwable e, BackendConnection conn); /** * 已得到有效链接的响应处理 */ void connectionAcquired(BackendConnection conn); /** * 收到错误数据包的响应处理 */ void errorResponse(byte[] err, BackendConnection conn); /** * 收到OK数据包的响应处理 */ void okResponse(byte[] ok, BackendConnection conn); /** * 收到字段数据包结束的响应处理 */ void fieldEofResponse(byte[] header, List<byte[]> fields, byte[] eof, BackendConnection conn); /** * 收到行数据包的响应处理 */ void rowResponse(byte[] row, BackendConnection conn); /** * 收到行数据包结束的响应处理 */ void rowEofResponse(byte[] eof, BackendConnection conn); /** * 写队列为空,能够写数据了 * */ void writeQueueAvailable(); /** * on connetion close event */ void connectionClose(BackendConnection conn, String reason); }
相关文章
相关标签/搜索