数据库路由中间件MyCat - 源代码篇(6)
3. 链接模块
3.3 AbstractConnection:
3.3.2 NIOHandler
NIOHandler实际上就是对于业务处理方法的封装,对于不一样的链接有不一样的处理方法,也就是不一样的NIOHandlerjava
public interface NIOHandler { void handle(byte[] data); }
它的实现以及子类会在以后的对应的处理模块细讲。数据库
3.3.3 NIOSocketWR
实现对于AbstractConnection(实际就是对里面封装的channel)进行异步读写,将从channel中读取到的放到AbstractConnection的readBuffer中,将writeBuffer和写队列中的数据写入到channel中。能够这么说,AbstractConnection的方法只对它里面的buffer进行操做,而buffer与channel之间的交互,是经过NIOSocketWR的方法完成的。
下面是它的方法以及对应的说明:缓存
public void register(Selector selector) throws IOException { try { processKey = channel.register(selector, SelectionKey.OP_READ, con); } finally { if (con.isClosed.get()) { clearSelectionKey(); } } } private void clearSelectionKey() { try { SelectionKey key = this.processKey; if (key != null && key.isValid()) { key.attach(null); key.cancel(); } } catch (Exception e) { AbstractConnection.LOGGER.warn("clear selector keys err:" + e); } }
调用关系:
这个方法就是以前讲的AbstractionConnection与RW线程绑定,AbstractionConnection封装的channel须要在RW线程的selector上注册读事件以监听读事件。markdown
public void doNextWriteCheck() { //检查是否正在写,看CAS更新writing值是否成功 if (!writing.compareAndSet(false, true)) { return; } try { //利用缓存队列和写缓冲记录保证写的可靠性,返回true则为所有写入成功 boolean noMoreData = write0(); //由于只有一个线程能够成功CAS更新writing值,因此这里不用再CAS writing.set(false); //若是所有写入成功并且写入队列为空(有可能在写入过程当中又有新的Bytebuffer加入到队列),则取消注册写事件 //不然,继续注册写事件 if (noMoreData && con.writeQueue.isEmpty()) { if ((processKey.isValid() && (processKey.interestOps() & SelectionKey.OP_WRITE) != 0)) { disableWrite(); } } else { if ((processKey.isValid() && (processKey.interestOps() & SelectionKey.OP_WRITE) == 0)) { enableWrite(false); } } } catch (IOException e) { if (AbstractConnection.LOGGER.isDebugEnabled()) { AbstractConnection.LOGGER.debug("caught err:", e); } con.close("err:" + e); } } private boolean write0() throws IOException { int written = 0; ByteBuffer buffer = con.writeBuffer; if (buffer != null) { //只要写缓冲记录中还有数据就不停写入,但若是写入字节为0,证实网络繁忙,则退出 while (buffer.hasRemaining()) { written = channel.write(buffer); if (written > 0) { con.netOutBytes += written; con.processor.addNetOutBytes(written); con.lastWriteTime = TimeUtil.currentTimeMillis(); } else { break; } } //若是写缓冲中还有数据证实网络繁忙,计数并退出,不然清空缓冲 if (buffer.hasRemaining()) { con.writeAttempts++; return false; } else { con.writeBuffer = null; con.recycle(buffer); } } //读取缓存队列并写channel while ((buffer = con.writeQueue.poll()) != null) { if (buffer.limit() == 0) { con.recycle(buffer); con.close("quit send"); return true; } buffer.flip(); while (buffer.hasRemaining()) { written = channel.write(buffer); if (written > 0) { con.lastWriteTime = TimeUtil.currentTimeMillis(); con.netOutBytes += written; con.processor.addNetOutBytes(written); con.lastWriteTime = TimeUtil.currentTimeMillis(); } else { break; } } //若是写缓冲中还有数据证实网络繁忙,计数,记录下此次未写完的数据到写缓冲记录并退出,不然回收缓冲 if (buffer.hasRemaining()) { con.writeBuffer = buffer; con.writeAttempts++; return false; } else { con.recycle(buffer); } } return true; } private void disableWrite() { try { SelectionKey key = this.processKey; key.interestOps(key.interestOps() & OP_NOT_WRITE); } catch (Exception e) { AbstractConnection.LOGGER.warn("can't disable write " + e + " con " + con); } } private void enableWrite(boolean wakeup) { boolean needWakeup = false; try { SelectionKey key = this.processKey; key.interestOps(key.interestOps() | SelectionKey.OP_WRITE); needWakeup = true; } catch (Exception e) { AbstractConnection.LOGGER.warn("can't enable write " + e); } if (needWakeup && wakeup) { processKey.selector().wakeup(); } }
这个doNextWriteCheck方法以前也讲过,看调用关系:
第一个调用关系没意义,WriteEventCheckRunner这个类从没被调用过。
第二个调用很。。。就是将这个方法简单封装,估计是为了好修改,以后会提两种写策略对比。
第三个调用是主要调用,全部往AbstractionConnection中写入都会调用Abstraction.write(ByteBuffer),这个方法先把要写的放入缓存队列,以后调用上面这个doNextWriteCheck方法。
第四个和第五个都是定时检查任务,为了检查是否有AbstractionConnection的写缓存没有写完的状况网络
@Override public void asynRead() throws IOException { ByteBuffer theBuffer = con.readBuffer; //若是buffer为空,证实被回收或者是第一次读,新分配一个buffer给AbstractConnection做为readBuffer if (theBuffer == null) { theBuffer = con.processor.getBufferPool().allocate(); con.readBuffer = theBuffer; } //从channel中读取数据,而且保存到对应AbstractConnection的readBuffer中,readBuffer处于write mode,返回读取了多少字节 int got = channel.read(theBuffer); //调用处理读取到的数据的方法 con.onReadData(got); }
这个方法以前也讲过,异步将channel中的数据读取到readBuffer中,以后调用对应AbstractConnection的处理方法。
调用关系:
按理说,应该只有在RW线程检测到读事件以后,才会调用这个异步读方法。可是在FrontendConnection的register()方法和BackendAIOConnection的register()方法都调用了。这是由于这两个方法在正常工做状况下为了注册一个会先主动发一个握手包,另外一个会先读取一个握手包。因此都会执行异步读方法。 异步