此文已由做者张镐薪受权网易云社区发布。
html
欢迎访问网易云社区,了解更多网易技术产品运营经验。前端
上一节咱们讲了后端链接的基本创建和响应处理,那么这些后端链接是何时创建的呢? 首先,MyCat配置文件中,DataHost标签中有minIdle这个属性。表明在MyCat初始化时,会在这个DataHost上初始化维护多少个链接(这些链接能够理解为链接池)。每一个前端Client链接会建立Session,而Session会根据命令的不一样而建立不一样的Handler。每一个Handler会从链接池中拿出所须要的链接并使用。在链接池大小不够时,RW线程会异步驱使新建所需的链接补充链接池,可是链接数最大不能超过配置的maxCon。同时,如以前所述,有定时线程检查并回收空闲后端链接。但池中最小不会小于minCon。 咱们能够经过后端链接的工厂方法的调用链来理解: 看这个调用链,咱们简述下大概的流程。java
st=>start: MyCat接受客户端链接并为之创建惟一绑定的Session e=>end: 将请求发送给对应链接,处理完以后归还链接 op1=>operation: MyCat接受客户端的请求,计算路由 op2=>operation: 根据请求和路由建立合适的handler,这里为SingleNodeHandler op3=>operation: 从PhysicalDBNode中获取后端链接 cond=>condition: 尝试获取链接,链接够用? op4=>operation: 尝试异步建立新的链接 op5=>operation: 经过DelegateResponseHandler将链接与以前的Handler,这里是SingleNodeHandler绑定 st->op1->op2->op3->condcond(yes)->econd(no)->op4->op5->e
咱们先从Session看起,在MyCat中实现类为NonBlockingSession。在前端链接创建时,会建立绑定惟一的Session: ServerConnectionFactory.java:node
protected FrontendConnection getConnection(NetworkChannel channel) throws IOException { SystemConfig sys = MycatServer.getInstance().getConfig().getSystem(); ServerConnection c = new ServerConnection(channel); MycatServer.getInstance().getConfig().setSocketParams(c, true); c.setPrivileges(MycatPrivileges.instance()); c.setQueryHandler(new ServerQueryHandler(c)); c.setLoadDataInfileHandler(new ServerLoadDataInfileHandler(c)); // c.setPrepareHandler(new ServerPrepareHandler(c)); c.setTxIsolation(sys.getTxIsolation()); //建立绑定惟一Session c.setSession2(new NonBlockingSession(c)); return c; }
Session主要处理事务,多节点转发协调等,由不一样的ResponseHandler实现; 这些ResponseHandler咱们以后会在对应的模块去细细分析。这里先跳过。 查看SingleNodeHanlder的处理方法 SingleNodeHanlder.java:es6
public void execute() throws Exception { //从这里开始计算处理时间 startTime=System.currentTimeMillis(); ServerConnection sc = session.getSource(); this.isRunning = true; this.packetId = 0; final BackendConnection conn = session.getTarget(node); //以前是否获取过Connection而且Connection有效 if (session.tryExistsCon(conn, node)) { _execute(conn); } else { // create new connection MycatConfig conf = MycatServer.getInstance().getConfig(); //从config中获取DataNode PhysicalDBNode dn = conf.getDataNodes().get(node.getName()); //获取对应的数据库链接 dn.getConnection(dn.getDatabase(), sc.isAutocommit(), node, this, node); } }
从PhysicalDBNode中获取合适的链接:数据库
public void getConnection(String schema,boolean autoCommit, RouteResultsetNode rrs, ResponseHandler handler, Object attachment) throws Exception { checkRequest(schema); //检查数据库链接池是否初始化成功,由于有reload命令 if (dbPool.isInitSuccess()) { //根据是否能在读节点上运行获取链接,通常是判断是否为读请求,而且读请求不在事务中 if (rrs.canRunnINReadDB(autoCommit)) { //根据负载均衡策略选择合适的后端链接 dbPool.getRWBanlanceCon(schema,autoCommit, handler, attachment, this.database); } else { //直接选择当前链接池中的的后端链接 dbPool.getSource().getConnection(schema,autoCommit, handler, attachment); } } else { throw new IllegalArgumentException("Invalid DataSource:" + dbPool.getActivedIndex()); } }
PhysicalDBPool类中有负载均衡,切换writeHost,控制write方式等(分别对应balance,writeType等标签)的实现。首先咱们看若是有负载均衡策略(配置了balance)的获取链接的方式:canvas
public void getRWBanlanceCon(String schema, boolean autocommit, ResponseHandler handler, Object attachment, String database) throws Exception { PhysicalDatasource theNode = null; ArrayList<PhysicalDatasource> okSources = null; switch (banlance) { //全部读写节点参与read请求的负载均衡,除了当前活跃的写节点,balance=1 case BALANCE_ALL_BACK: { //返回全部写节点和符合条件的读节点,不包括当前的写节点 okSources = getAllActiveRWSources(true, false, checkSlaveSynStatus()); if (okSources.isEmpty()) { //若是结果即为空,返回当前写节点 theNode = this.getSource(); } else { //不为空,随机选一个 theNode = randomSelect(okSources); } break; } //全部读写节点参与read请求的负载均衡,balance=2 case BALANCE_ALL: { //返回全部写节点和符合条件的读节点 okSources = getAllActiveRWSources(true, true, checkSlaveSynStatus()); //随机选一个 theNode = randomSelect(okSources); break; } case BALANCE_ALL_READ: { //返回全部符合条件的读节点 okSources = getAllActiveRWSources(false, false, checkSlaveSynStatus()); //随机取一个 theNode = randomSelect(okSources); break; } //不作负载均衡,balance=0或其余不为以上的值 case BALANCE_NONE: default: // return default write data source theNode = this.getSource(); } if (LOGGER.isDebugEnabled()) { LOGGER.debug("select read source " + theNode.getName() + " for dataHost:" + this.getHostName()); } theNode.getConnection(schema, autocommit, handler, attachment); }
其中涉及到的方法:后端
返回符合条件节点集:安全
private ArrayList<PhysicalDatasource> getAllActiveRWSources( boolean includeWriteNode, boolean includeCurWriteNode, boolean filterWithSlaveThreshold) { int curActive = activedIndex; ArrayList<PhysicalDatasource> okSources = new ArrayList<PhysicalDatasource>(this.allDs.size()); //判断写节点 for (int i = 0; i < this.writeSources.length; i++) { PhysicalDatasource theSource = writeSources[i]; //判断写节点是不是active,可能reload会置为inactive,可能多个写节点可是只有一个是活跃在用的(writeType=0) if (isAlive(theSource)) { //负载均衡策略是否包含写节点 if (includeWriteNode) { //判断是否包含当前活跃的写入节点 if (i == curActive && includeCurWriteNode == false) { // not include cur active source } else if (filterWithSlaveThreshold) { //若是包含从节点同步延迟限制,检查同步状态 if (canSelectAsReadNode(theSource)) { okSources.add(theSource); } else { //若是同步状态不对,则不添加这个写节点 continue; } } else { okSources.add(theSource); } } //检查theSource对应的读节点 if (!readSources.isEmpty()) { // 检查theSource对应的读节点(从节点) PhysicalDatasource[] allSlaves = this.readSources.get(i); if (allSlaves != null) { for (PhysicalDatasource slave : allSlaves) { if (isAlive(slave)) { //若是包含从节点同步延迟限制,检查同步状态 if (filterWithSlaveThreshold) { if (canSelectAsReadNode(slave)) { //若是同步状态正确,则把读节点加入 okSources.add(slave); } else { continue; } } else { okSources.add(slave); } } } } } } else { // TODO : add by zhuam // 若是写节点不OK, 也要保证临时的读服务正常 if (this.dataHostConfig.isTempReadHostAvailable()) { if (!readSources.isEmpty()) { // check all slave nodes PhysicalDatasource[] allSlaves = this.readSources.get(i); if (allSlaves != null) { for (PhysicalDatasource slave : allSlaves) { if (isAlive(slave)) { if (filterWithSlaveThreshold) { if (canSelectAsReadNode(slave)) { okSources.add(slave); } else { continue; } } else { okSources.add(slave); } } } } } } } } return okSources; }
检查是否判断主从延迟:性能优化
private boolean checkSlaveSynStatus() { return (dataHostConfig.getSlaveThreshold() != -1) && (dataHostConfig.getSwitchType() == DataHostConfig.SYN_STATUS_SWITCH_DS); }
随机选择节点:
/** * TODO: modify by zhuam * <p/> * 随机选择,按权重设置随机几率。 * 在一个截面上碰撞的几率高,但调用量越大分布越均匀,并且按几率使用权重后也比较均匀,有利于动态调整提供者权重。 * * @param okSources * @return */ public PhysicalDatasource randomSelect(ArrayList<PhysicalDatasource> okSources) { if (okSources.isEmpty()) { return this.getSource(); } else { int length = okSources.size(); // 总个数 int totalWeight = 0; // 总权重 boolean sameWeight = true; // 权重是否都同样 for (int i = 0; i < length; i++) { int weight = okSources.get(i).getConfig().getWeight(); totalWeight += weight; // 累计总权重 if (sameWeight && i > 0 && weight != okSources.get(i - 1).getConfig().getWeight()) { // 计算全部权重是否同样 sameWeight = false; } } if (totalWeight > 0 && !sameWeight) { // 若是权重不相同且权重大于0则按总权重数随机 int offset = random.nextInt(totalWeight); // 并肯定随机值落在哪一个片段上 for (int i = 0; i < length; i++) { offset -= okSources.get(i).getConfig().getWeight(); if (offset < 0) { return okSources.get(i); } } } // 若是权重相同或权重为0则均等随机 return okSources.get(random.nextInt(length)); //int index = Math.abs(random.nextInt()) % okSources.size(); //return okSources.get(index); } }
根据writeType获取当前writeHost方法:
public PhysicalDatasource getSource() { switch (writeType) { //writeType=0,返回当前active的writeHost case WRITE_ONLYONE_NODE: { return writeSources[activedIndex]; } //writeType=1,随机发到一个writeHost case WRITE_RANDOM_NODE: { int index = Math.abs(wnrandom.nextInt()) % writeSources.length; PhysicalDatasource result = writeSources[index]; if (!this.isAlive(result)) { // find all live nodes ArrayList<Integer> alives = new ArrayList<Integer>(writeSources.length - 1); for (int i = 0; i < writeSources.length; i++) { if (i != index) { if (this.isAlive(writeSources[i])) { alives.add(i); } } } if (alives.isEmpty()) { result = writeSources[0]; } else { // random select one index = Math.abs(wnrandom.nextInt()) % alives.size(); result = writeSources[alives.get(index)]; } } if (LOGGER.isDebugEnabled()) { LOGGER.debug("select write source " + result.getName() + " for dataHost:" + this.getHostName()); } return result; } //参数不正确 default: { throw new java.lang.IllegalArgumentException("writeType is " + writeType + " ,so can't return one write datasource "); } } }
更多网易技术、产品、运营经验分享请点击。
相关文章:
【推荐】 canvas 动画库 CreateJs 之 EaselJS(下篇)
【推荐】 针对低网速的性能优化
【推荐】 nej+regular环境使用es6的低成本方案