数据库路由中间件MyCat - 源代码篇(7)
3. 链接模块
3.4 FrontendConnection前端链接
构造方法:前端
public FrontendConnection(NetworkChannel channel) throws IOException { super(channel); InetSocketAddress localAddr = (InetSocketAddress) channel.getLocalAddress(); InetSocketAddress remoteAddr = null; if (channel instanceof SocketChannel) { remoteAddr = (InetSocketAddress) ((SocketChannel) channel).getRemoteAddress(); } else if (channel instanceof AsynchronousSocketChannel) { remoteAddr = (InetSocketAddress) ((AsynchronousSocketChannel) channel).getRemoteAddress(); } this.host = remoteAddr.getHostString(); this.port = localAddr.getPort(); this.localPort = remoteAddr.getPort(); this.handler = new FrontendAuthenticator(this); }
FrontendConnection是对前端链接channel的封装,接受NetworkChannel做为参数构造。前端链接创建,须要先验证其权限,因此,handler首先设置为FrontendAuthenticator
等到验证成功,handler会被设置成FrontendCommandHandler。
下面来看和FrontendConnection相关的Handler:
FrontendCommandHandler会先解析请求类型,以后调用不一样的方法处理不一样类型的请求。例如,FrontendQueryHandler会解析query类型的sql请求语句:java
@Override public void handle(byte[] data) { if(source.getLoadDataInfileHandler()!=null&&source.getLoadDataInfileHandler().isStartLoadData()) { MySQLMessage mm = new MySQLMessage(data); int packetLength = mm.readUB3(); if(packetLength+4==data.length) { source.loadDataInfileData(data); } return; } switch (data[4]) { case MySQLPacket.COM_INIT_DB: commands.doInitDB(); source.initDB(data); break; case MySQLPacket.COM_QUERY: commands.doQuery(); source.query(data); break; case MySQLPacket.COM_PING: commands.doPing(); source.ping(); break; case MySQLPacket.COM_QUIT: commands.doQuit(); source.close("quit cmd"); break; case MySQLPacket.COM_PROCESS_KILL: commands.doKill(); source.kill(data); break; case MySQLPacket.COM_STMT_PREPARE: commands.doStmtPrepare(); source.stmtPrepare(data); break; case MySQLPacket.COM_STMT_EXECUTE: commands.doStmtExecute(); source.stmtExecute(data); break; case MySQLPacket.COM_STMT_CLOSE: commands.doStmtClose(); source.stmtClose(data); break; case MySQLPacket.COM_HEARTBEAT: commands.doHeartbeat(); source.heartbeat(data); break; default: commands.doOther(); source.writeErrMessage(ErrorCode.ER_UNKNOWN_COM_ERROR, "Unknown command"); } }
FrontendCommandHandler会调用FrontendConnection合适的方法解析处理不一样的请求,例如它的initDB(byte[] data)方法:node
public void initDB(byte[] data) { MySQLMessage mm = new MySQLMessage(data); mm.position(5); String db = mm.readString(); // 检查schema的有效性 if (db == null || !privileges.schemaExists(db)) { writeErrMessage(ErrorCode.ER_BAD_DB_ERROR, "Unknown database '" + db + "'"); return; } if (!privileges.userExists(user, host)) { writeErrMessage(ErrorCode.ER_ACCESS_DENIED_ERROR, "Access denied for user '" + user + "'"); return; } Set<String> schemas = privileges.getUserSchemas(user); if (schemas == null || schemas.size() == 0 || schemas.contains(db)) { this.schema = db; write(writeToBuffer(OkPacket.OK, allocate())); } else { String s = "Access denied for user '" + user + "' to database '" + db + "'"; writeErrMessage(ErrorCode.ER_DBACCESS_DENIED_ERROR, s); } }
方法调用:
经过查看能够发现,在command packet被解析出是initDB类型的请求时(其实就是用户发送的查询语句为“use XXX”),会调用此方法进行处理,同时,这些方法都是被RW线程执行的。
此方法从FrontedPrivilege中验证用户是否有权限访问这个逻辑库,若是有就把当前链接的逻辑库设为用户请求的逻辑库。
其余方法与handler也是类似的关系,能够看出,FrontendConnection组合了多种封装的handler来处理不一样的请求的不一样阶段。至于各类handler,会在以后sql解析,sql路由,协议实现等模块详细介绍。sql
3.4.1 ServerConnection服务端链接
前端链接包括ServerConnection(服务端链接)和ManagerConnection(管理端链接)。前端连接不会直接建立,而是经过工厂建立:
工厂方法:数据库
@Override protected FrontendConnection getConnection(NetworkChannel channel) throws IOException { SystemConfig sys = MycatServer.getInstance().getConfig().getSystem(); ServerConnection c = new ServerConnection(channel); MycatServer.getInstance().getConfig().setSocketParams(c, true); c.setPrivileges(MycatPrivileges.instance()); c.setQueryHandler(new ServerQueryHandler(c)); c.setLoadDataInfileHandler(new ServerLoadDataInfileHandler(c)); // c.setPrepareHandler(new ServerPrepareHandler(c)); c.setTxIsolation(sys.getTxIsolation()); c.setSession2(new NonBlockingSession(c)); return c; }
能够看出,每一个新的ServerConnection都会绑定一个新的ServerQueryHandler负责处理sql指令,一个ServerLoadDataInfileHandler负责处理文件载入命令,一个session负责处理事务
下面是相关的类图
这里的全部独立的handler里面都是static方法,可供其余类直接调用。每一个ServerConnection都会有一个NonBlockingSession来处理。
这里说下链接、会话、逻辑库、MyCat实例的关系(与MySQL里面的链接、会话、数据库、MySQL实例的关系不太同样);首先每一个MyCat实例都管理多个数据库。链接是针对MyCat实例创建的,而且,MyCat的链接(AbstractConnection)是不可复用的,在close方法会关闭链接并清理使用的资源。可是缓存资源(buffer)是能够复用的。好比,在一个前端链接长时间空闲时或者出现异常时,会被清理掉。每一个链接会拥有一个session来处理事务,保存会话信息。
这里,每一个链接拥有一个会话。每一个链接中的方法,被RW线程执行,至关于与RW线程绑定。RW线程是能够复用的,这里至关于MySQL中的链接是能够复用的(链接池)。
Session.java:缓存
public interface Session { /** * 取得源端链接 */ FrontendConnection getSource(); /** * 取得当前目标端数量 */ int getTargetCount(); /** * 开启一个会话执行 */ void execute(RouteResultset rrs, int type); /** * 提交一个会话执行 */ void commit(); /** * 回滚一个会话执行 */ void rollback(); /** * 取消一个正在执行中的会话 * * @param sponsor * 若是发起者为null,则表示由本身发起。 */ void cancel(FrontendConnection sponsor); /** * 终止会话,必须在关闭源端链接后执行该方法。 */ void terminate(); }
下面咱们着重研究它的实现类NonBlockingSession:
首先,取得源端链接方法FrontendConnection getSource();,其实就是NonBlockingSession在建立时就已绑定一个链接,谁会调用这个方法取得源端连接呢?
能够发现,主要有各类查询的handler还有SQLengine会去调用。由于处理不管返回什么结果,都须要返回给源端。
int getTargetCount();取得当前目标端数量。根据目标端的数量不一样会用不一样的handler处理转发SQL和合并结果。markdown
@Override public void execute(RouteResultset rrs, int type) { // 清理以前处理用的资源 clearHandlesResources(); if (LOGGER.isDebugEnabled()) { StringBuilder s = new StringBuilder(); LOGGER.debug(s.append(source).append(rrs).toString() + " rrs "); } // 检查路由结果是否为空 RouteResultsetNode[] nodes = rrs.getNodes(); if (nodes == null || nodes.length == 0 || nodes[0].getName() == null || nodes[0].getName().equals("")) { //若是为空,则表名有误,提示客户端 source.writeErrMessage(ErrorCode.ER_NO_DB_ERROR, "No dataNode found ,please check tables defined in schema:" + source.getSchema()); return; } //若是路由结果个数为1,则为单点查询或事务 if (nodes.length == 1) { //使用SingleNodeHandler处理单点查询或事务 singleNodeHandler = new SingleNodeHandler(rrs, this); try { singleNodeHandler.execute(); } catch (Exception e) { LOGGER.warn(new StringBuilder().append(source).append(rrs), e); source.writeErrMessage(ErrorCode.ERR_HANDLE_DATA, e.toString()); } } else { //若是路由结果>1,则为多点查询或事务 boolean autocommit = source.isAutocommit(); SystemConfig sysConfig = MycatServer.getInstance().getConfig() .getSystem(); //mutiNodeLimitType没有用。。。 int mutiNodeLimitType = sysConfig.getMutiNodeLimitType(); //使用multiNodeHandler处理多点查询或事务 multiNodeHandler = new MultiNodeQueryHandler(type, rrs, autocommit, this); try { multiNodeHandler.execute(); } catch (Exception e) { LOGGER.warn(new StringBuilder().append(source).append(rrs), e); source.writeErrMessage(ErrorCode.ERR_HANDLE_DATA, e.toString()); } } }
每次一个Session执行SQL时,会先清理handler使用的资源。SingleNodeHandler与multiNodeHandler以后会讲。这里的handler咱们以后会在每一个模块去讲,Session以后也还会提到,敬请期待session