因为最近工做比较忙,前先后后花了两个月的时间把TBSchedule的源码翻了个底朝天。关于TBSchedule的使用,网上也有不少参考资料,这里不作过多的阐述。本文着重介绍TBSchedule的运行机制,架构设计以及优化建议。经过学习别人的经验,来提升本身的技术能力,感觉阿里人的智慧,也向阿里空玄,阿里玄难为开源贡献致敬。java
TBSchedule依赖于ZK存储调度数据,在使用中充当着nosql的角色,zk的watch机制只用于zk重连,提升可靠性。下图是zk与tbschedule的部署图。算法
TBSchedule有不少特性,包括批量任务,多主机,多线程,动态扩展,实时或定时任务,分片,并发,不重复执行。在介绍这些特性以前,先来了解一下整个zk目录结构,有助于理解整个调度过程。下图是zk调度数据结构图。其中()内表示zk目录保存的数据。sql
1)TBSchedule在zookeeper初始化完成以后初始化数据,其中建立basetasktype,stractegy,factory目录。调用registerManagerFactory,在factory目录下建立瞬时有序节点,节点名称(IP+$+HostName+$+UUID+$Sequence),而后根据ip是否在ip管理范围内,在strategy目录下添加或删除对应的(IP+$+HostName+$+UUID+$Sequence)瞬时目录节点。最后启动默认的refresh()操做。数组
2)TBSchedule在每2s中zk正常状况下执行一次refresh操做,该操做若是查询zk管理信息异常则中止全部调度任务后从新注册管理器工厂,若是管理器start状态=false,则中止全部调度任务。具体实如今TBScheduleManagerFactory的reRegisterManagerFactory()中。具体代码以下:缓存
public void reRegisterManagerFactory() throws Exception { // 根据UUID,在/factory目录下查找对应目录,并在/strategy目录下更具IP数组, //肯定可管理的strtegyName下建立(IP+$+HostName+$+UUID+$Sequence)目录 // 返回不可管理的调度策略类型名称,并中止对应的调度处理器 List<String> stopList = this.getScheduleStrategyManager() .registerManagerFactory(this); for (String strategyName : stopList) { this.stopServer(strategyName); //中止对应的调度处理器 } //根据策略从新分配调度任务机器的任务数,并在zk上更新对应的ScheduleStrategyRunntime中的AssignNum this.assignScheduleServer(); //注意,一个strategyName下只有惟一表示当前调度服务器的节点(IP+$+HostName+$+UUID+$Sequence) //同时一个strategyName对应该调度服务器多个IStrategyTask任务管理器,一个taskItem对应一个任务管理器 //多则删停,少则加起 this.reRunScheduleServer(); }
这边再介绍下tbschedule的任务分配策略,列如当前有4台机器(A,B,C,D),共10个任务(0,1..9)。首先将10个任务均等分,每一个服务器能够分配到2个任务,最后剩余两个任务将给A,B服务器得到,具体算法以下:安全
/** * 分配任务数量 * @param serverNum 总的服务器数量 * @param taskItemNum 任务项数量 * @param maxNumOfOneServer 每一个server最大任务项数目 * @param maxNum 总的任务数量 * @return */ public static int[] assignTaskNumber(int serverNum,int taskItemNum,int maxNumOfOneServer){ int[] taskNums = new int[serverNum]; int numOfSingle = taskItemNum / serverNum; int otherNum = taskItemNum % serverNum; //20150323 删除, 任务分片保证分配到全部的线程组数上。 开始 // if (maxNumOfOneServer >0 && numOfSingle >= maxNumOfOneServer) { // numOfSingle = maxNumOfOneServer; // otherNum = 0; // } //20150323 删除, 任务分片保证分配到全部的线程组数上。 结束 for (int i = 0; i < taskNums.length; i++) { if (i < otherNum) { taskNums[i] = numOfSingle + 1; } else { taskNums[i] = numOfSingle; } } return taskNums; }
3)接下来根据每一个strategyName下得到的任务数,来建立对应任务调度管理器数。具体实如今reRunScheduleServer()方法中,循环建立IStrategyTask,根据调度类型Schedule,Java,bean实例化不一样的任务管理器TBScheduleManagerStatic,也包括自定义管理器,只要继承IStrategyTask接口就能够了。列如自定义管理器,须要配置taskname为java全类名或者bean的名称。服务器
package com.taobao.pamirs.schedule.test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.taobao.pamirs.schedule.strategy.IStrategyTask; /** * 自定义任务管理器,调度类型为Java,Bean * @author Administrator * */ public class JavaTaskDemo implements IStrategyTask,Runnable { protected static transient Logger log = LoggerFactory.getLogger(JavaTaskDemo.class); private String parameter; private boolean stop = false; public void initialTaskParameter(String strategyName,String taskParameter) { parameter = taskParameter; new Thread(this).start(); } @Override public void stop(String strategyName) throws Exception { this.stop = true; } @Override public void run() { while(stop == false){ log.error("执行任务:" + this.parameter); try { Thread.sleep(1000); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } }
对于经常使用的Schedule调度类型,使用的是TBScheduleManagerStatic管理器。网络
4)任务调度分配器TBScheduleManager,可以使得任务分片被不重复,不遗漏的快速处理。该功能也是TBSchedule的核心实现,一个JVM能够包含不一样taskType的多个任务调度分配器。也就是说能够有相同任务taskType的多个任务管理器,也能够存在不一样的tasktype的任务管理器。每一个任务管理器包含一个任务处理器IScheduleProcessor,IScheduleProcessor其实是个Runnnable对象,根据任务类型的线程数来初始化调度线程。任务处理器分为SLEEP和NotSleep模式。数据结构
下面是建立TBScheduleManager的操做。多线程
TBScheduleManager(TBScheduleManagerFactory aFactory,String baseTaskType,String ownSign ,IScheduleDataManager aScheduleCenter) throws Exception{ this.factory = aFactory; this.currentSerialNumber = serialNumber(); this.scheduleCenter = aScheduleCenter; this.taskTypeInfo = this.scheduleCenter.loadTaskTypeBaseInfo(baseTaskType); log.info("create TBScheduleManager for taskType:"+baseTaskType); //清除已通过期1天的TASK,OWN_SIGN的组合。超过一天没有活动server的视为过时 this.scheduleCenter.clearExpireTaskTypeRunningInfo(baseTaskType,ScheduleUtil.getLocalIP() + "清除过时OWN_SIGN信息",this.taskTypeInfo.getExpireOwnSignInterval()); Object dealBean = aFactory.getBean(this.taskTypeInfo.getDealBeanName()); if (dealBean == null) { throw new Exception( "SpringBean " + this.taskTypeInfo.getDealBeanName() + " 不存在"); } if (dealBean instanceof IScheduleTaskDeal == false) { throw new Exception( "SpringBean " + this.taskTypeInfo.getDealBeanName() + " 没有实现 IScheduleTaskDeal接口"); } this.taskDealBean = (IScheduleTaskDeal)dealBean; if(this.taskTypeInfo.getJudgeDeadInterval() < this.taskTypeInfo.getHeartBeatRate() * 5){ throw new Exception("数据配置存在问题,死亡的时间间隔,至少要大于心跳线程的5倍。当前配置数据:JudgeDeadInterval = " + this.taskTypeInfo.getJudgeDeadInterval() + ",HeartBeatRate = " + this.taskTypeInfo.getHeartBeatRate()); } //生成ScheduleServer信息。 this.currenScheduleServer = ScheduleServer.createScheduleServer(this.scheduleCenter,baseTaskType,ownSign,this.taskTypeInfo.getThreadNumber()); //设置ScheduleServer的ManagerFactoryUUID this.currenScheduleServer.setManagerFactoryUUID(this.factory.getUuid()); //在/server下注册ScheduleServer信息,实际上能够当作在server目录下的每个子节点表示一个任务调度管理器 scheduleCenter.registerScheduleServer(this.currenScheduleServer); this.mBeanName = "pamirs:name=" + "schedule.ServerMananger." +this.currenScheduleServer.getUuid(); this.heartBeatTimer = new Timer(this.currenScheduleServer.getTaskType() +"-" + this.currentSerialNumber +"-HeartBeat"); this.heartBeatTimer.schedule(new HeartBeatTimerTask(this), new java.util.Date(System.currentTimeMillis() + 500), this.taskTypeInfo.getHeartBeatRate()); initial(); }
5)上面有两个重要的操做,一个是心跳调度器,主要职责是更新/server目录下对应的调度管理器心跳信息,清除过时的scheduleServer,若是是leader则进行任务项的分配。
class HeartBeatTimerTask extends java.util.TimerTask { private static transient Logger log = LoggerFactory .getLogger(HeartBeatTimerTask.class); TBScheduleManager manager; public HeartBeatTimerTask(TBScheduleManager aManager) { manager = aManager; } public void run() { try { Thread.currentThread().setPriority(Thread.MAX_PRIORITY); manager.refreshScheduleServerInfo(); } catch (Exception ex) { log.error(ex.getMessage(), ex); } } }
/** * 若是发现本次更新的时间若是已经超过了,服务器死亡的心跳周期,则不能在向服务器更新信息。 * 而应该看成新的服务器,进行从新注册。 * @throws Exception */ public void refreshScheduleServerInfo() throws Exception { try{ //在/server下更新任务调度服务器的心跳时间,调度信息 rewriteScheduleInfo(); //若是任务信息没有初始化成功,不作任务相关的处理,未完成init() if(this.isRuntimeInfoInitial == false){ return; } //从新分配任务,leader从新检查可用调度管理器,并修改taskItem下的current_server,req_server. this.assignScheduleTask(); //判断是否须要从新加载任务队列,避免任务处理进程没必要要的检查和等待 //思路:每一次修改了taskitem的任务分配以后,会在/taskitem下保存leader信息,及默认版本号-1 //比较保存的上一次任务加载的版本号是否 < 当前的版本号 boolean tmpBoolean = this.isNeedReLoadTaskItemList(); if(tmpBoolean != this.isNeedReloadTaskItem){ //只要不相同,就设置须要从新装载,由于在心跳异常的时候,作了清理队列的事情,恢复后须要从新装载。 synchronized (NeedReloadTaskItemLock) { this.isNeedReloadTaskItem = true; } rewriteScheduleInfo(); } if(this.isPauseSchedule == true || this.processor != null && processor.isSleeping() == true){ //若是服务已经暂停了,则须要从新定时更新 cur_server 和 req_server //若是服务没有暂停,必定不能调用的 //调度服务策略若是已经失效,会抛出异常 //加载任务list<taskDefine> this.getCurrentScheduleTaskItemListNow(); } }catch(Throwable e){ //清除内存中全部的已经取得的数据和任务队列,避免心跳线程失败时候致使的数据重复 this.clearMemoInfo(); if(e instanceof Exception){ throw (Exception)e; }else{ throw new Exception(e.getMessage(),e); } } }
其中的this.assignScheduleTask();实现了任务调度管理器的变化而相应的修改/taskItem下curr_server和req_server的调度变化。核心思想:rewriteScheduleInfo()中没有相应的调度服务器,则在/server下注册。而后获取有效的全部调度服务器,遍历全部任务项,若是发现该任务项的curr_server表示的manager不存在,则设置null。而后对全部的任务分片从新分配调度服务器,具体算法以下:
public void assignTaskItem(String taskType, String currentUuid,int maxNumOfOneServer, List<String> taskServerList) throws Exception { if(this.isLeader(currentUuid,taskServerList)==false){ if(log.isDebugEnabled()){ log.debug(currentUuid +":不是负责任务分配的Leader,直接返回"); } return; } if(log.isDebugEnabled()){ log.debug(currentUuid +":开始从新分配任务......"); } if(taskServerList.size()<=0){ //在服务器动态调整的时候,可能出现服务器列表为空的清空 return; } String baseTaskType = ScheduleUtil.splitBaseTaskTypeFromTaskType(taskType); String zkPath = this.PATH_BaseTaskType + "/" + baseTaskType + "/" + taskType + "/" + this.PATH_TaskItem; List<String> children = this.getZooKeeper().getChildren(zkPath, false); // Collections.sort(children); // 20150323 有些任务分片,业务方实际上是用数字的字符串排序的。优先以数字进行排序,不然以字符串排序 Collections.sort(children,new Comparator<String>(){ public int compare(String u1, String u2) { if(StringUtils.isNumeric(u1) && StringUtils.isNumeric(u2)){ int iU1= Integer.parseInt(u1); int iU2= Integer.parseInt(u2); /*if(iU1==iU2){ return 0 ; }else if(iU1>iU2){ return 1 ; }else{ return -1; }*/ return iU1-iU2; }else{ return u1.compareTo(u2); } } }); int unModifyCount =0; int[] taskNums = ScheduleUtil.assignTaskNumber(taskServerList.size(), children.size(), maxNumOfOneServer); int point =0; int count = 0; String NO_SERVER_DEAL = "没有分配到服务器"; for(int i=0;i <children.size();i++){ String name = children.get(i); if(point <taskServerList.size() && i >= count + taskNums[point]){ count = count + taskNums[point]; point = point + 1; } String serverName = NO_SERVER_DEAL; if(point < taskServerList.size() ){ serverName = taskServerList.get(point); } byte[] curServerValue = this.getZooKeeper().getData(zkPath + "/" + name + "/cur_server",false,null); byte[] reqServerValue = this.getZooKeeper().getData(zkPath + "/" + name + "/req_server",false,null); if(curServerValue == null || new String(curServerValue).equals(NO_SERVER_DEAL)){ //对没有分配的任务分片,添加调度服务器 this.getZooKeeper().setData(zkPath + "/" + name + "/cur_server",serverName.getBytes(),-1); this.getZooKeeper().setData(zkPath + "/" + name + "/req_server",null,-1); }else if(new String(curServerValue).equals(serverName)==true && reqServerValue == null ){ //不须要作任何事情 当前执行的调度器正好和从新分配的调度器一致 unModifyCount = unModifyCount + 1; }else{ //调度服务器请求转换 this.getZooKeeper().setData(zkPath + "/" + name + "/req_server",serverName.getBytes(),-1); } } if(unModifyCount < children.size()){ //设置须要全部的服务器从新装载任务 log.info("设置须要全部的服务器从新装载任务:updateReloadTaskItemFlag......"+taskType+ " ,currentUuid "+currentUuid ); //设置/server[v.2][reload=true] this.updateReloadTaskItemFlag(taskType); } if(log.isDebugEnabled()){ StringBuffer buffer = new StringBuffer(); for(ScheduleTaskItem taskItem: this.loadAllTaskItem(taskType)){ buffer.append("\n").append(taskItem.toString()); } log.debug(buffer.toString()); } }
在第4点附上的源码最后有个initial();操做,首先启动一个独立的线程,判断isRuntimeInfoInitial标志位判断是否已经初始化数据,若是没有则leader调度器执行initialRunningInfo(),删除/TaskItem目录,根据ScheduleTaskType,获取到的任务项数组,建立任务项节点,同时在/taskItem下设置leader数据。initial()源码以下:
public void initial() throws Exception{ new Thread(this.currenScheduleServer.getTaskType() +"-" + this.currentSerialNumber +"-StartProcess"){ @SuppressWarnings("static-access") public void run(){ try{ log.info("开始获取调度任务队列...... of " + currenScheduleServer.getUuid()); //并发启动调度管理器,直至leader初始化任务项完成 while (isRuntimeInfoInitial == false) { if(isStopSchedule == true){ log.debug("外部命令终止调度,退出调度队列获取:" + currenScheduleServer.getUuid()); return; } //log.error("isRuntimeInfoInitial = " + isRuntimeInfoInitial); try{ initialRunningInfo(); //在/taskitem下的数据判断是否为leader的数据 isRuntimeInfoInitial = scheduleCenter.isInitialRunningInfoSucuss( currenScheduleServer.getBaseTaskType(), currenScheduleServer.getOwnSign()); }catch(Throwable e){ //忽略初始化的异常 log.error(e.getMessage(),e); } if(isRuntimeInfoInitial == false){ Thread.currentThread().sleep(1000); } } int count =0; lastReloadTaskItemListTime = scheduleCenter.getSystemTime(); //此处会给currentTaskItemList添加元素,直至加载到任务 while(getCurrentScheduleTaskItemListNow().size() <= 0){ if(isStopSchedule == true){ log.debug("外部命令终止调度,退出调度队列获取:" + currenScheduleServer.getUuid()); return; } Thread.currentThread().sleep(1000); count = count + 1; // log.error("尝试获取调度队列,第" + count + "次 ") ; } String tmpStr ="TaskItemDefine:"; for(int i=0;i< currentTaskItemList.size();i++){ if(i>0){ tmpStr = tmpStr +","; } tmpStr = tmpStr + currentTaskItemList.get(i); } log.info("获取到任务处理队列,开始调度:" + tmpStr +" of "+ currenScheduleServer.getUuid()); //任务总量 taskItemCount = scheduleCenter.loadAllTaskItem(currenScheduleServer.getTaskType()).size(); //只有在已经获取到任务处理队列后才开始启动任务处理器 computerStart(); }catch(Exception e){ log.error(e.getMessage(),e); String str = e.getMessage(); if(str.length() > 300){ str = str.substring(0,300); } startErrorInfo = "启动处理异常:" + str; } } }.start(); }
最后的computerStart()方法是实现周期执行的关键,TBSchedule基于cronExpression表达式实现周期性调度,执行类型分为两种TYPE_PAUSE,TYPE_RESUME。并更新setNextRunStartTime和setNextRunEndTime。
/** * 开始的时候,计算第一次执行时间 * @throws Exception */ public void computerStart() throws Exception{ //只有当存在可执行队列后再开始启动队列 boolean isRunNow = false; if(this.taskTypeInfo.getPermitRunStartTime() == null){ isRunNow = true; }else{ String tmpStr = this.taskTypeInfo.getPermitRunStartTime(); if(tmpStr.toLowerCase().startsWith("startrun:")){ isRunNow = true; tmpStr = tmpStr.substring("startrun:".length()); } CronExpression cexpStart = new CronExpression(tmpStr); Date current = new Date( this.scheduleCenter.getSystemTime()); Date firstStartTime = cexpStart.getNextValidTimeAfter(current); this.heartBeatTimer.schedule( new PauseOrResumeScheduleTask(this,this.heartBeatTimer, PauseOrResumeScheduleTask.TYPE_RESUME,tmpStr), firstStartTime); this.currenScheduleServer.setNextRunStartTime(ScheduleUtil.transferDataToString(firstStartTime)); if( this.taskTypeInfo.getPermitRunEndTime() == null || this.taskTypeInfo.getPermitRunEndTime().equals("-1")){ this.currenScheduleServer.setNextRunEndTime("当不能获取到数据的时候pause"); }else{ try { String tmpEndStr = this.taskTypeInfo.getPermitRunEndTime(); CronExpression cexpEnd = new CronExpression(tmpEndStr); Date firstEndTime = cexpEnd.getNextValidTimeAfter(firstStartTime); Date nowEndTime = cexpEnd.getNextValidTimeAfter(current); if(!nowEndTime.equals(firstEndTime) && current.before(nowEndTime)){ isRunNow = true; firstEndTime = nowEndTime; } this.heartBeatTimer.schedule( new PauseOrResumeScheduleTask(this,this.heartBeatTimer, PauseOrResumeScheduleTask.TYPE_PAUSE,tmpEndStr), firstEndTime); this.currenScheduleServer.setNextRunEndTime(ScheduleUtil.transferDataToString(firstEndTime)); } catch (Exception e) { log.error("计算第一次执行时间出现异常:" + currenScheduleServer.getUuid(), e); throw new Exception("计算第一次执行时间出现异常:" + currenScheduleServer.getUuid(), e); } } } //若是没有getPermitRunStartTime,则跳过timer调度,当即执行 if(isRunNow == true){ this.resume("开启服务当即启动"); } this.rewriteScheduleInfo(); }
从上面的代码中,咱们注意到了这个调度使用同一个timer对象,每次调度执行后在timer添加新的调度task。若是是PAUSE类型调度,则执行manager.pause("到达终止时间,pause调度"),若是是RESUME,则执行manager.resume("到达开始时间,resume调度");,并计算下次调度时间,从新添加到调度队列。具体实现以下:
class PauseOrResumeScheduleTask extends java.util.TimerTask { private static transient Logger log = LoggerFactory .getLogger(HeartBeatTimerTask.class); public static int TYPE_PAUSE = 1; public static int TYPE_RESUME = 2; TBScheduleManager manager; Timer timer; int type; String cronTabExpress; public PauseOrResumeScheduleTask(TBScheduleManager aManager,Timer aTimer,int aType,String aCronTabExpress) { this.manager = aManager; this.timer = aTimer; this.type = aType; this.cronTabExpress = aCronTabExpress; } public void run() { try { Thread.currentThread().setPriority(Thread.MAX_PRIORITY); this.cancel();//取消调度任务 Date current = new Date(System.currentTimeMillis()); CronExpression cexp = new CronExpression(this.cronTabExpress); Date nextTime = cexp.getNextValidTimeAfter(current); if(this.type == TYPE_PAUSE){ manager.pause("到达终止时间,pause调度"); this.manager.getScheduleServer().setNextRunEndTime(ScheduleUtil.transferDataToString(nextTime)); }else{ manager.resume("到达开始时间,resume调度"); this.manager.getScheduleServer().setNextRunStartTime(ScheduleUtil.transferDataToString(nextTime)); } this.timer.schedule(new PauseOrResumeScheduleTask(this.manager,this.timer,this.type,this.cronTabExpress) , nextTime); } catch (Throwable ex) { log.error(ex.getMessage(), ex); } } }
resume即在可执行时间区间恢复调度,根据SchduleTaskType配置的处理器类型模式Sleep或者NotSleep来初始化处理器。默认使用TBScheduleProcessorSleep处理器。
/** * 处在了可执行的时间区间,恢复运行 * @throws Exception */ public void resume(String message) throws Exception{ if (this.isPauseSchedule == true) { if(log.isDebugEnabled()){ log.debug("恢复调度:" + this.currenScheduleServer.getUuid()); } this.isPauseSchedule = false; this.pauseMessage = message; if (this.taskDealBean != null) { if (this.taskTypeInfo.getProcessorType() != null && this.taskTypeInfo.getProcessorType().equalsIgnoreCase("NOTSLEEP")==true){ this.taskTypeInfo.setProcessorType("NOTSLEEP"); this.processor = new TBScheduleProcessorNotSleep(this, taskDealBean,this.statisticsInfo); }else{ this.processor = new TBScheduleProcessorSleep(this, taskDealBean,this.statisticsInfo); this.taskTypeInfo.setProcessorType("SLEEP"); } } rewriteScheduleInfo();//更新心跳信息 } }
6)多线程执行,TBScheduleProcessorSleep是一个Runnable对象,多个调度线程共享以下变量:
final LockObject m_lockObject = new LockObject(); //缓存线程对象 List<Thread> threadList = new CopyOnWriteArrayList<Thread>(); /** * 任务管理器 */ protected TBScheduleManager scheduleManager; /** * 任务类型 */ ScheduleTaskType taskTypeInfo; /** * 任务处理的接口类 */ protected IScheduleTaskDeal<T> taskDealBean; /** * 当前任务队列的版本号 */ protected long taskListVersion = 0; final Object lockVersionObject = new Object(); final Object lockRunningList = new Object(); //任务队列 protected List<T> taskList = new CopyOnWriteArrayList<T>(); /** * 是否能够批处理 */ boolean isMutilTask = false; /** * 是否已经得到终止调度信号 */ boolean isStopSchedule = false;// 用户中止队列调度 boolean isSleeping = false;
在初始化执行处理器,会启动ThreadNumber个线程数,
for (int i = 0; i < taskTypeInfo.getThreadNumber(); i++) {
this.startThread(i);
}
下面具体看一下线程的run()操做。一个执行线程的职责主要是执行自定义的IScheduleTaskDealSingle,而IScheduleTaskDealMulti能够实现批量处理,实现区别也是大同小异。其核心思想:
对开始执行的线程计数+1,在没有中止调度的前提下即resume状态下,执行客户自定义ScheduleTask的execute()方法,并完成执行统计。当任务队列中的全部任务Item都执行完成,队列为空时,若是正在执行任务的线程数不是最后一个线程,则等待。反之,则加载任务,有数据唤醒全部等待线程继续执行,没数据线程sleep SleepTimeNoData时间,并继续加载任务数据。
public void run(){ try { long startTime =0; while(true){ this.m_lockObject.addThread(); Object executeTask; while (true) { if(this.isStopSchedule == true){//中止队列调度 this.m_lockObject.realseThread(); this.m_lockObject.notifyOtherThread();//通知全部的休眠线程 synchronized (this.threadList) { this.threadList.remove(Thread.currentThread()); if(this.threadList.size()==0){ this.scheduleManager.unRegisterScheduleServer(); } } return; } //加载调度任务 if(this.isMutilTask == false){ executeTask = this.getScheduleTaskId(); }else{ executeTask = this.getScheduleTaskIdMulti(); } if(executeTask == null){ break; } try {//运行相关的程序 startTime =scheduleManager.scheduleCenter.getSystemTime(); if (this.isMutilTask == false) { if (((IScheduleTaskDealSingle) this.taskDealBean).execute(executeTask,scheduleManager.getScheduleServer().getOwnSign()) == true) { addSuccessNum(1, scheduleManager.scheduleCenter.getSystemTime() - startTime, "com.taobao.pamirs.schedule.TBScheduleProcessorSleep.run"); } else { addFailNum(1, scheduleManager.scheduleCenter.getSystemTime() - startTime, "com.taobao.pamirs.schedule.TBScheduleProcessorSleep.run"); } } else { if (((IScheduleTaskDealMulti) this.taskDealBean) .execute((Object[]) executeTask,scheduleManager.getScheduleServer().getOwnSign()) == true) { addSuccessNum(((Object[]) executeTask).length,scheduleManager.scheduleCenter.getSystemTime() - startTime, "com.taobao.pamirs.schedule.TBScheduleProcessorSleep.run"); } else { addFailNum(((Object[]) executeTask).length,scheduleManager.scheduleCenter.getSystemTime() - startTime, "com.taobao.pamirs.schedule.TBScheduleProcessorSleep.run"); } } }catch (Throwable ex) { if (this.isMutilTask == false) { addFailNum(1,scheduleManager.scheduleCenter.getSystemTime()- startTime, "TBScheduleProcessor.run"); } else { addFailNum(((Object[]) executeTask).length, scheduleManager.scheduleCenter.getSystemTime() - startTime, "TBScheduleProcessor.run"); } logger.warn("Task :" + executeTask + " 处理失败", ex); } } //当前队列中全部的任务都已经完成了。 if(logger.isTraceEnabled()){ logger.trace(Thread.currentThread().getName() +":当前运行线程数量:" +this.m_lockObject.count()); } if (this.m_lockObject.realseThreadButNotLast() == false) { int size = 0; Thread.currentThread().sleep(100); startTime =scheduleManager.scheduleCenter.getSystemTime(); // 装载数据 size = this.loadScheduleData(); if (size > 0) { this.m_lockObject.notifyOtherThread(); } else { //判断当没有数据的是否,是否须要退出调度 if (this.isStopSchedule == false && this.scheduleManager.isContinueWhenData()== true ){ if(logger.isTraceEnabled()){ logger.trace("没有装载到数据,start sleep"); } this.isSleeping = true; Thread.currentThread().sleep(this.scheduleManager.getTaskTypeInfo().getSleepTimeNoData()); this.isSleeping = false; if(logger.isTraceEnabled()){ logger.trace("Sleep end"); } }else{ //没有数据,退出调度,唤醒全部沉睡线程 this.m_lockObject.notifyOtherThread(); } } this.m_lockObject.realseThread(); } else {// 将当前线程放置到等待队列中。直到有线程装载到了新的任务数据 if(logger.isTraceEnabled()){ logger.trace("不是最后一个线程,sleep"); } this.m_lockObject.waitCurrentThread(); } } } catch (Throwable e) { logger.error(e.getMessage(), e); } }
列如存在上图目录节点,原有的查找节点数有些问题,不能彻底删除目录。我这边提供的思想是递归查找目录树。最终结果为A-B-E-C-F-D,删除节点的时候从最后一个节点删除,不会出现子目录存在而直接删除父节点的操做。代码以下:
/** * 使用递归遍历全部结点 * * @param zk * @param path * @param dealList * @throws Exception * @throws InterruptedException */ private static void getTree(ZooKeeper zk, String path, List<String> dealList) throws Exception, InterruptedException { //添加父目录 dealList.add(path); List<String> children = zk.getChildren(path, false); if (path.charAt(path.length() - 1) != '/') { path = path + "/"; } //添加子目录 for (int i = 0; i < children.size(); i++) { getTree(zk, path + children.get(i), dealList); } }
经过上面TBSchedule的源码分析,咱们知道一个任务调度处理器,会建立一个timer根据cron表达式执行resume和pause操做。每一次resume都会建立TBScheduleProcessorSleep,而后初始化多个线程。
当该timer进行N次调度resume的时候,也就是系统会建立N*threadNum个线程,执行pause操做,则这些线程将会销毁。个人建议是每个任务调度处理器,都指定1个线程数的cacheThreadPool线程池。可能会有人说,为什么不指定一个ThreadNum数的fixedThreadPool。由于当timer执行屡次resume的时候,若是上一次的resume尚未完成,线程池中没有空闲的线程来执行新的task,会形成线程依赖而下一调度的超时或者失败。指定cacheThreadPool,根据ThreadNum值向线程池submit ThreadNum个runnable对象。
在任务执行器TBScheduleProcessorSleep中,t经过加载任务item (List<TaskItemDefine> taskItems),执行taskDealBean.selectTasks方法。获取到的数据存放在CopyOnWriteArrayList中。这里简单的介绍下写时拷贝容器CopyOnWriteArrayList,其对并发读不会加锁,而对并发写同步,当有一个写请求,首先获取锁 ,而后复制当前容器内数据,进行增删改,最后替换掉原有的数组引用,从而达到现场安全的目的,实际上该容器很是适合读多写少的场景。而目前的场景并无读get的操做。获取容器元素调用remove()方法,一样获取锁。
既然读已经同步,那么在获取任务的时候,就不须要加synchronized关键字了。原有代码以下:
public synchronized Object getScheduleTaskId() { //能够去除synchronized
if (this.taskList.size() > 0) return this.taskList.remove(0); // 按正序处理 return null; }
整个TBSchedule的调度,默认两秒内会执行refresh()操做,中止全部的任务调度器而后从新建立新的任务调度器。这样的好处可使得某一个调度节点宕机,或者网络缘由致使心跳失败,再或者在控制台修改了调度策略配置信息。能够动态的生效。可是若是可以基于ZK的watch机制,对系统的消耗会更小。因为在factory目录下建立的都是瞬时节点,若是某一个server宕机。zk会watch到相应的事件。一样,在ScheduleTaskType下的数据发生改变,zk一样能够watch到相应的事件。若是发现出现了上述几种状况,那么TBSchedule能够执行refresh()操做了。
TBSchedule的使用场景仍是很是普遍,如定时数据同步,日志上报等等。不一样于quartz的抢占式任务调度,TBSchedule更侧重于任务多分片并行处理,基于分布式集群提升任务处理能力。知其然且知其因此然有助于更好的使用框架,并解决实际问题。
更多资料:http://geek.csdn.net/news/detail/65738
源码:http://code.taobao.org/p/tbschedule/src/
,