算法原理是注册中心有一个队列表PAMIRS_S
,它包含以下关键信息:QUEUE_ID是队列标识 CUR_SERVER是当前分配服务器标识,REQ_SERVER是申请分配服务器标识。 java
假若有1,2,3,4,5个队列,有A,B,C三个服务器依次启动。则算法的规则是这样的: mysql
A启动的时候: 算法
因为没有其它的主机,则将全部的队列分配给A。 sql
QUEUE_ID |
CUR_SERVER |
REQ_SERVER |
1 | A | |
2 | A |
|
3 | A |
|
4 | A | |
5 | A | |
B启动的时候: 数据库
QUEUE_ID |
CUR_SERVER |
REQ_SERVER |
1 | A | |
2 | A |
B |
3 | A |
|
4 | A | B |
5 | A | |
C启动的时候: 服务器
QUEUE_ID |
CUR_SERVER |
REQ_SERVER |
1 | A | |
2 | A |
B |
3 | A |
C |
4 | A | |
5 | A | B |
D启动的时候: 性能
QUEUE_ID |
CUR_SERVER |
REQ_SERVER |
1 | A | |
2 | A |
B |
3 | A |
C |
4 | A | D |
5 | A | |
上述算法中实现了预分配,那何时实现正式分配呢?当在获取任务队列的时候(必须控制在当前服务器中的全部任务都执行完毕的状况下,不然会重复执行任务的可能性)会先释放本身已经持有,可是别人要申请的队列,将这些队列让给申请人。 ui
好比当前队列是A,在执行释放队列前的数据状态是: this
QUEUE_ID |
CUR_SERVER |
REQ_SERVER |
1 | A | |
2 | A |
B |
3 | A |
C |
4 | A | D |
5 | A | |
QUEUE_ID |
CUR_SERVER |
REQ_SERVER |
1 | A | |
2 | B | |
3 | C |
|
4 | D | |
5 | A | |
最开始的代码是在TBScheduleManager的方法assignScheduleTask方法。 spa
public void assignScheduleTask() throws Exception { int clearServerCount = scheduleCenter .clearExpireScheduleServer(this.taskTypeInfo,this.taskTypeRunningInfo); List<ScheduleServer> serverList = scheduleCenter .selectAllValidScheduleServer(this.getTaskTypeRunningInfo().getTaskType()); int clearTaskQueueInfoCount = scheduleCenter.clearTaskQueueInfo( this.getTaskTypeRunningInfo().getTaskType(), serverList); boolean isNeedReAssign = false; if (clearServerCount > 0 || clearTaskQueueInfoCount > 0) { isNeedReAssign = true; } else { for (ScheduleServer item : serverList) { //注意,比较时间必定要用数据库时间 if (item.getCenterServerTime().getTime() - item.getRegisterTime().getTime() < taskTypeInfo.getJudgeDeadInterval() * 3 ) { isNeedReAssign = true; break; } } } if (isNeedReAssign == true) { scheduleCenter.assignQueue(this.getTaskTypeRunningInfo().getTaskType(), this.currenScheduleServer.getUuid(), serverList); } if (log.isDebugEnabled()) { //log.debug(message); } }
它会先查询一下是否须要从新分配队列,当已经清理过过时的服务器,或者已经清理过非法服务器持有的队列,或者有新的服务器(注册时间距离如今时间小于3个时间周期)注册的时候,则须要从新预分配队列。比较时间必定要以注册中心的时间为准。
须要从新预分配队列则进入方法scheduleCenter.assignQueue。
private Connection getConnection() throws SQLException{ Connection result = this.dataSource.getConnection(); if(result.getAutoCommit() == true){ result.setAutoCommit(false); } return result; } public void assignQueue(String taskType, String currentUuid, List<ScheduleServer> serverList) throws Exception { Connection conn = null; try{ conn = this.getConnection(); clientInner.assignQueue(conn, taskType,currentUuid,serverList); conn.commit(); }catch(Throwable e){ if(conn != null){ conn.rollback(); } if(e instanceof Exception){ throw (Exception)e; }else{ throw new Exception(e); } }finally{ if(conn!= null){ conn.close(); } } }
这个方法说明链接关闭了自动提交,方法内的多个SQL执行是在一个事务里的。这个很是关键。
/** * 从新分配任务处理队列 * * @param taskType * @param serverList * @throws Exception */ public void assignQueue(Connection conn,String taskType, String currentUuid, List<ScheduleServer> serverList) throws Exception { this.lockTaskTypeRunningInfo(conn,taskType, currentUuid); String sqlQueue = " SELECT TASK_TYPE,QUEUE_ID,CUR_SERVER,REQ_SERVER FROM " + transferTableName(conn, "PAMIRS_SCHEDULE_QUEUE") + " WHERE TASK_TYPE = ? ORDER BY QUEUE_ID"; PreparedStatement stmtQueue = conn.prepareStatement(sqlQueue); stmtQueue.setString(1, taskType); ResultSet setQueue = stmtQueue.executeQuery(); int point = 0; int taskCount = 0; while (setQueue.next()) { PreparedStatement stmtUpdateQueue = null; String sqlModifyQueue = ""; if (setQueue.getString("CUR_SERVER") == null) { sqlModifyQueue = " UPDATE " + transferTableName(conn, "PAMIRS_SCHEDULE_QUEUE") + " SET CUR_SERVER = ?,REQ_SERVER = null,GMT_MODIFIED = " + getDataBaseSysdateString(conn) + " WHERE TASK_TYPE = ? and QUEUE_ID = ? "; stmtUpdateQueue = conn.prepareStatement(sqlModifyQueue); stmtUpdateQueue.setString(1, serverList.get(point) .getUuid()); stmtUpdateQueue.setString(2, taskType); stmtUpdateQueue .setString(3, setQueue.getString("QUEUE_ID")); stmtUpdateQueue.executeUpdate(); stmtUpdateQueue.close(); } else if (!(serverList.get(point).getUuid().equalsIgnoreCase( setQueue.getString("CUR_SERVER")) == true && setQueue .getString("REQ_SERVER") == null)) { sqlModifyQueue = " UPDATE " + transferTableName(conn, "PAMIRS_SCHEDULE_QUEUE") + " SET REQ_SERVER = ? ,GMT_MODIFIED = " + getDataBaseSysdateString(conn) + " WHERE TASK_TYPE = ? and QUEUE_ID = ? "; stmtUpdateQueue = conn.prepareStatement(sqlModifyQueue); stmtUpdateQueue.setString(1, serverList.get(point) .getUuid()); stmtUpdateQueue.setString(2, taskType); stmtUpdateQueue .setString(3, setQueue.getString("QUEUE_ID")); stmtUpdateQueue.executeUpdate(); stmtUpdateQueue.close(); } else { // 不须要修改当前记录的信息 } taskCount = taskCount + 1; if (point >= serverList.size() - 1) { point = 0; } else { point = point + 1; } } setQueue.close(); stmtQueue.close(); if (taskCount == 0) { throw new Exception("没有对任务类型配置数据处理队列,TASK_TYPE = " + taskType); } }
public void lockTaskTypeRunningInfo(Connection conn,String taskType, String lockServerUuid) throws Exception { String sql = " UPDATE " + transferTableName(conn, "PAMIRS_SCHEDULE_TASKTRUN") + " set LAST_ASSIGN_TIME = " + getDataBaseSysdateString(conn) + ",LAST_ASSIGN_UUID = ? , GMT_MODIFIED = " + getDataBaseSysdateString(conn) + " where TASK_TYPE = ? "; PreparedStatement statement = conn.prepareStatement(sql); statement.setString(1, lockServerUuid); statement.setString(2, taskType); statement.executeUpdate(); statement.close(); }
分配队列以前,会先调用方法lockTaskTypeRunningInfo对这个运行期类型进行加锁,看它使用的SQL语句能够看出来,它是使用了数据库实现的行锁(或者范围锁)来实现加锁,避免多个进程同时分配队列时的冲突,其它进程若要更新该行须要等待释放锁。这就要求咱们在建表的时候必定要对字段TASK_TYPE创建索引,而且若是是mysql的话,要选择支持行锁的表引擎,避免锁粒度过大致使的系统性能问题。
分配队列的实现是先查询出该任务全部的队列列表,而后循环这个列表,依次给这个队列列表分配服务器,参数输入的是有效服务器列表。
这个代码就实现了上述算法。它依次对队列列表进行循环,有下面这些状况:
若是当前队列未分配服务器(即 CUR_SERVER=null)则将当前服务器分配给该队列(即赋值给CUR_SERVER字段);
若是当前队列已经分配服务器(即 CUR_SERVER!=null),而且分配的服务器不是当前服务器,则将当前服务器设置为待分配服务器(即赋值给REQ_SERVER字段);若是是当前服务器则表示应分配,就没有必要再放入待分配服务器。
其中服务器的选择是循环的,由于服务器的数量可能小于队列数。选择到最后一个服务器则下一个又回到第一个服务器。
这样就实现了服务器能够均匀的分配给多个队列,当服务器数大于队列数的时候就有可能会出现有的服务器没法分配给对应的任务队列的问题,会报警。
在调度管理器中有一个获取当前服务器某个任务队列列表的方法,查看该方法源码能够看到检查处理器中的数据是否已经处理完,若没有处理完则会循环等待阻塞程序直处处理完成才能继续获取任务队列。它最终调用了私有方法getCurrentScheduleQueueNow。
/** * 从新加载当前服务器的任务队列 * 一、释放当前服务器持有,但有其它服务器进行申请的任务队列 * 二、从新获取当前服务器的处理队列 * * 为了不此操做的过分,阻塞真正的数据处理能力。系统设置一个从新装载的频率。例如1分钟 * * 特别注意: * 此方法的调用必须是在当前全部任务都处理完毕后才能调用,不然是否任务队列后可能数据被重复处理 */ @SuppressWarnings("static-access") public List<String> getCurrentScheduleQueue() { try{ if (this.isNeedReloadQueue == true) { //特别注意:须要判断数据队列是否已经空了,不然可能在队列切换的时候致使数据重复处理 //主要是在线程不休眠就加载数据的时候必定须要这个判断 if (this.processor != null) { while (this.processor.isDealFinishAllData() == false) { Thread.currentThread().sleep(50); } } //真正开始处理数据 this.getCurrentScheduleQueueNow(); } this.lastReloadTaskQueueTime = ScheduleUtil.getCurrentTimeMillis(); return this.currentTaskQueue; }catch(Exception e){ throw new RuntimeException(e); } }
getCurrentScheduleQueueNow方法才真正实现了获取队列的逻辑,咱们进去看一下。
private List<String> getCurrentScheduleQueueNow() throws Exception { //是否被人申请的队列 this.scheduleCenter.releaseDealQueue(this.getTaskTypeRunningInfo().getTaskType(), this.currenScheduleServer.getUuid()); //从新查询当前服务器可以处理的队列 this.currentTaskQueue = this.scheduleCenter.reloadDealQueue( this.getTaskTypeRunningInfo().getTaskType(), this.currenScheduleServer.getUuid()); //若是超过10个心跳周期尚未获取到调度队列,则报警 if(this.currentTaskQueue.size() ==0 && ScheduleUtil.getCurrentTimeMillis() - this.lastReloadTaskQueueTime > this.taskTypeInfo.getHeartBeatRate() * 10){ String message ="调度服务器" + this.currenScheduleServer.getUuid() +"[TASK_TYPE=" + this.getTaskTypeRunningInfo().getTaskType() + "]自启动以来,超过10个心跳周期,还 没有获取到分配的任务队列"; log.warn(message); if(this.scheduleAlert != null){ this.scheduleAlert.noTaskQueue(this.getTaskTypeRunningInfo().getTaskType(), this.currenScheduleServer.getUuid(),message); } } if(this.currentTaskQueue.size() >0){ //更新时间戳 this.lastReloadTaskQueueTime = ScheduleUtil.getCurrentTimeMillis(); } return this.currentTaskQueue; }
它先调用了scheduleCenter.releaseDealQueue方法释放本身的队列,即下列代码。而后从新加载本身的队列,当10个周期获取到的队列数为0则会报警。
/** * 释放本身把持,别人申请的队列 * * @param taskType * @param uuid * @return * @throws Exception */ public void releaseDealQueue(Connection conn,String taskType, String uuid) throws Exception { String querySql = "select QUEUE_ID from " + transferTableName(conn, "PAMIRS_SCHEDULE_QUEUE") + " WHERE TASK_TYPE = ? and CUR_SERVER = ? AND REQ_SERVER IS NOT NULL "; PreparedStatement stmtQueue = conn.prepareStatement(querySql); stmtQueue.setString(1, taskType); stmtQueue.setString(2, uuid); ResultSet set = stmtQueue.executeQuery(); List<String> queueIds = new ArrayList<String>(); while(set.next()){ queueIds.add(set.getString("QUEUE_ID")); } set.close(); stmtQueue.close(); String sqlQueue = " update " + transferTableName(conn, "PAMIRS_SCHEDULE_QUEUE") + " set CUR_SERVER = REQ_SERVER,REQ_SERVER = NULL, GMT_MODIFIED = " + getDataBaseSysdateString(conn) + " WHERE TASK_TYPE = ? and CUR_SERVER = ? AND QUEUE_ID = ? AND REQ_SERVER IS NOT NULL "; for(String queueId:queueIds){ stmtQueue = conn.prepareStatement(sqlQueue); stmtQueue.setString(1, taskType); stmtQueue.setString(2, uuid); stmtQueue.setString(3, queueId); stmtQueue.executeUpdate(); stmtQueue.close(); conn.commit(); } }该方法的实现是查询当前任务分给当前服务器的全部队列列表,而后会依次循环将字段REQ_SERVER的值赋给字段CUR_SERVER,也就是表示将待分配服务器正式设置为已分配服务器,而且将REQ_SERVER设置为空,这也就实现了服务器释放算法。