此文已由做者张镐薪受权网易云社区发布。
前端
欢迎访问网易云社区,了解更多网易技术产品运营经验。react
Title:MySql链接创建以及认证过程client->MySql:1.TCP链接请求 MySql->client:2.接受TCP链接client->MySql:3.TCP链接创建MySql->client:4.握手包HandshakePacketclient->MySql:5.认证包AuthPacketMySql->client:6.若是验证成功,则返回OkPacketclient->MySql:7.默认会发送查询版本信息的包MySql->client:8.返回结果包
NIOReactor其实就是一个网络事件反应转发器。 不少地方会用到NIOReactor,这里先讲FrontendConnection和NIOReactor绑定这一部分。上一节说到,NIOAcceptor的accept()最后将FrontendConnection交给了NIOReactor池其中的一个NIOReactor。调用的是 postRegister(AbstractConnection c)方法。sql
final void postRegister(AbstractConnection c) { reactorR.registerQueue.offer(c); reactorR.selector.wakeup(); }
postRegister将刚才传入的FrontendConnection放入RW线程的注册队列。以后,唤醒RW线程的selector。 为何放入RW线程的注册队列,而不是直接注册呢?若是是直接注册,那么就是NIOAcceptor这个线程负责注册,这里就会有锁竞争,由于NIOAcceptor这个线程和每一个RW线程会去竞争selector的锁。这样NIOAcceptor就不能高效的处理链接。因此,更好的方式是将FrontendConnection放入RW线程的注册队列,以后让RW线程本身完成注册工做。 RW线程的源代码:后端
private final class RW implements Runnable { private final Selector selector; private final ConcurrentLinkedQueue<AbstractConnection> registerQueue; private long reactCount; private RW() throws IOException { this.selector = Selector.open(); this.registerQueue = new ConcurrentLinkedQueue<AbstractConnection>(); } @Override public void run() { final Selector selector = this.selector; Set<SelectionKey> keys = null; for (;;) { ++reactCount; try { selector.select(500L); //从注册队列中取出AbstractConnection以后注册读事件 //以后作一些列操做,请参考下面注释 register(selector); keys = selector.selectedKeys(); for (SelectionKey key : keys) { AbstractConnection con = null; try { Object att = key.attachment(); if (att != null) { con = (AbstractConnection) att; if (key.isValid() && key.isReadable()) { try { //异步读取数据并处理数据 con.asynRead(); } catch (IOException e) { con.close("program err:" + e.toString()); continue; } catch (Exception e) { LOGGER.debug("caught err:", e); con.close("program err:" + e.toString()); continue; } } if (key.isValid() && key.isWritable()) { //异步写数据 con.doNextWriteCheck(); } } else { key.cancel(); } } catch (CancelledKeyException e) { if (LOGGER.isDebugEnabled()) { LOGGER.debug(con + " socket key canceled"); } } catch (Exception e) { LOGGER.warn(con + " " + e); } } } catch (Exception e) { LOGGER.warn(name, e); } finally { if (keys != null) { keys.clear(); } } } } private void register(Selector selector) { AbstractConnection c = null; if (registerQueue.isEmpty()) { return; } while ((c = registerQueue.poll()) != null) { try { //注册读事件 ((NIOSocketWR) c.getSocketWR()).register(selector); //链接注册,对于FrontendConnection是发送HandshakePacket并异步读取响应 //响应为AuthPacket,读取其中的信息,验证用户名密码等信息,若是符合条件 //则发送OkPacket c.register(); } catch (Exception e) { c.close("register err" + e.toString()); } } } }
由于NIOAcceptor线程和RW线程这两个都会操做RW线程的注册队列,因此要用ConcurrentLinkedQueue RW线程不断检查selector中须要响应的事件,并若是注册队列不为空,就不断注册其中的AbstractConnection,在这里就是FrontendConnection。 以后执行FrontendConnection的register()方法:缓存
@Override public void register() throws IOException { if (!isClosed.get()) { // 生成认证数据 byte[] rand1 = RandomUtil.randomBytes(8); byte[] rand2 = RandomUtil.randomBytes(12); // 保存认证数据 byte[] seed = new byte[rand1.length + rand2.length]; System.arraycopy(rand1, 0, seed, 0, rand1.length); System.arraycopy(rand2, 0, seed, rand1.length, rand2.length); this.seed = seed; // 发送握手数据包 HandshakePacket hs = new HandshakePacket(); hs.packetId = 0; hs.protocolVersion = Versions.PROTOCOL_VERSION; hs.serverVersion = Versions.SERVER_VERSION; hs.threadId = id; hs.seed = rand1; hs.serverCapabilities = getServerCapabilities(); hs.serverCharsetIndex = (byte) (charsetIndex & 0xff); hs.serverStatus = 2; hs.restOfScrambleBuff = rand2; // 异步写,本节就讲到这里 hs.write(this); // 异步读取并处理,这个与RW线程中的asynRead()相同,以后客户端收到握手包返回AuthPacket(就是下一节)就是从这里开始看。 this.asynRead(); } }
这个方法就是生成HandshakePacket并发送出去,以后异步读取响应。 以前的示例中MySql的HandshakePacket结构: 能够总结出: HandshakePacket:安全
packet length(3 bytes)网络
packet number (1)架构
protocol version (1)并发
version (null terminated string)框架
thread id (4)
salt (8)
server capabilities (2)
server charset (1)
server status (2)
unused (13)
salt (12)
0x00 --- 结束
这里咱们看下MyCat中的实现这一部分MySql协议栈的packet类结构: 这里能够看出,每一个包都实现了本身的包长度和信息方法,而且针对前段后端链接都有读写方法实现,因此,以后读写数据都会根据场景不一样调用这些类中的方法。这些包就是整个MySql协议栈除逻辑外的内容实现。 HandshakePacket.write(FrontendConnection c)方法将上面传入的数据封装成ByteBuffer,并传入给FrontendConnection c的write(ByteBuffer buffer),这个方法直接继承自AbstractConnection:
public final void write(ByteBuffer buffer) { //首先判断是否为压缩协议 if(isSupportCompress()) { //CompressUtil为压缩协议辅助工具类 ByteBuffer newBuffer= CompressUtil.compressMysqlPacket(buffer,this,compressUnfinishedDataQueue); //将要写的数据先放入写缓存队列 writeQueue.offer(newBuffer); } else { //将要写的数据先放入写缓存队列 writeQueue.offer(buffer); } try { //处理写事件,这个方法比较复杂,须要重点分析其思路 this.socketWR.doNextWriteCheck(); } catch (Exception e) { LOGGER.warn("write err:", e); this.close("write err:" + e); } }
如代码注释中所述,先将要写的数据放入写缓冲队列,以后调用NIOSocketWR.doNextWriteCheck()处理写事件。
public void doNextWriteCheck() { //检查是否正在写,看CAS更新writing值是否成功 if (!writing.compareAndSet(false, true)) { return; } try { //利用缓存队列和写缓冲记录保证写的可靠性,返回true则为所有写入成功 boolean noMoreData = write0(); //由于只有一个线程能够成功CAS更新writing值,因此这里不用再CAS writing.set(false); //若是所有写入成功并且写入队列为空(有可能在写入过程当中又有新的Bytebuffer加入到队列),则取消注册写事件 //不然,继续注册写事件 if (noMoreData && con.writeQueue.isEmpty()) { if ((processKey.isValid() && (processKey.interestOps() & SelectionKey.OP_WRITE) != 0)) { disableWrite(); } } else { if ((processKey.isValid() && (processKey.interestOps() & SelectionKey.OP_WRITE) == 0)) { enableWrite(false); } } } catch (IOException e) { if (AbstractConnection.LOGGER.isDebugEnabled()) { AbstractConnection.LOGGER.debug("caught err:", e); } con.close("err:" + e); } } private boolean write0() throws IOException { int written = 0; ByteBuffer buffer = con.writeBuffer; if (buffer != null) { //只要写缓冲记录中还有数据就不停写入,但若是写入字节为0,证实网络繁忙,则退出 while (buffer.hasRemaining()) { written = channel.write(buffer); if (written > 0) { con.netOutBytes += written; con.processor.addNetOutBytes(written); con.lastWriteTime = TimeUtil.currentTimeMillis(); } else { break; } } //若是写缓冲中还有数据证实网络繁忙,计数并退出,不然清空缓冲 if (buffer.hasRemaining()) { con.writeAttempts++; return false; } else { con.writeBuffer = null; con.recycle(buffer); } } //读取缓存队列并写channel while ((buffer = con.writeQueue.poll()) != null) { if (buffer.limit() == 0) { con.recycle(buffer); con.close("quit send"); return true; } buffer.flip(); while (buffer.hasRemaining()) { written = channel.write(buffer); if (written > 0) { con.lastWriteTime = TimeUtil.currentTimeMillis(); con.netOutBytes += written; con.processor.addNetOutBytes(written); con.lastWriteTime = TimeUtil.currentTimeMillis(); } else { break; } } //若是写缓冲中还有数据证实网络繁忙,计数,记录下此次未写完的数据到写缓冲记录并退出,不然回收缓冲 if (buffer.hasRemaining()) { con.writeBuffer = buffer; con.writeAttempts++; return false; } else { con.recycle(buffer); } } return true; } private void disableWrite() { try { SelectionKey key = this.processKey; key.interestOps(key.interestOps() & OP_NOT_WRITE); } catch (Exception e) { AbstractConnection.LOGGER.warn("can't disable write " + e + " con " + con); } } private void enableWrite(boolean wakeup) { boolean needWakeup = false; try { SelectionKey key = this.processKey; key.interestOps(key.interestOps() | SelectionKey.OP_WRITE); needWakeup = true; } catch (Exception e) { AbstractConnection.LOGGER.warn("can't enable write " + e); } if (needWakeup && wakeup) { processKey.selector().wakeup(); } }
注释已经很详细,如此执行完,便成功将握手包发送给了客户端。 在这里稍微吐槽下,因为MyCat在网络通讯上同时作了AIO和NIO,可是在设计上AbstractionConnection和这些并无关系。可是又涉及到缓存队列,因此设计上出现了一些以下的类模式: 这样应该是不推荐这么设计的,目前我还没想好如何去改善
更多网易技术、产品、运营经验分享请点击。
相关文章:
【推荐】 一行代码搞定Dubbo接口调用
【推荐】 网易美学-系统架构系列1-分布式与服务化
【推荐】 Vue框架核心之数据劫持