sleep模式: java
当某一个线程任务处理完毕,从任务池中取不到任务的时候,检查其它线程是否处于活动状态。若是是,则本身休眠; 若是其它线程都已经由于没有任务进入休眠,当前线程是最后一个活动线程的时候,就调用业务接口,获取须要处理的任务,放入任务池中, 同时唤醒其它休眠线程开始工做。 数据库
调度管理的resume方法是程序入口,咱们来看看。 安全
/** * 处在了可执行的时间区间,恢复运行 * @throws Exception */ public void resume(String message) throws Exception{ if (this.isPauseSchedule == true) { if(log.isDebugEnabled()){ log.debug("恢复调度:" + this.currenScheduleServer.getUuid()); } this.isPauseSchedule = false; this.pauseMessage = message; if (this.queueDealTask != null) { if (this.taskTypeInfo.getProcessorType() != null && this.taskTypeInfo.getProcessorType().equalsIgnoreCase("NOTSLEEP")==true){ this.taskTypeInfo.setProcessorType("NOTSLEEP"); this.processor = new TBScheduleProcessorNotSleep(this, queueDealTask,this.statisticsInfo); }else{ this.processor = new TBScheduleProcessorSleep(this, queueDealTask,this.statisticsInfo); this.taskTypeInfo.setProcessorType("SLEEP"); } } rewriteScheduleInfo(); } }
咱们进入类TBScheduleProcessorSleep的构造函数看看,它作了些什么。 多线程
public TBScheduleProcessorSleep(TBScheduleManager aManager, IScheduleTaskDeal<T> aQueueTask, StatisticsInfo aStatisticsInfo) throws Exception { this.scheduleManager = aManager; this.statisticsInfo = aStatisticsInfo; this.taskTypeInfo = this.scheduleManager.getTaskTypeInfo(); this.taskDealProcessor = aQueueTask; if (this.taskDealProcessor instanceof IScheduleTaskDealSingle<?>) { if (taskTypeInfo.getExecuteNumber() > 1) { taskTypeInfo.setExecuteNumber(1); } isMutilTask = false; } else { isMutilTask = true; } if (taskTypeInfo.getFetchDataNumber() < taskTypeInfo.getThreadNumber() * 10) { logger.warn("参数设置不合理,系统性能不佳。【每次从数据库获取的数量fetchnum】 >= 【线程数量threadnum】 *【最少循环次数10】 "); } for (int i = 0; i < taskTypeInfo.getThreadNumber(); i++) { this.startThread(i); } }
最后根据任务类中的ThreadNumber值来启动对应数目的线程数。 并发
private void startThread(int index) { Thread thread = new Thread(this); threadList.add(thread); String threadName = this.scheduleManager.getTaskTypeRunningInfo().getTaskType()+"-" + this.scheduleManager.getCurrentSerialNumber() + "-exe" + index; thread.setName(threadName); thread.start(); }
建立一个线程,而且启动该线程,传递的Runnable对象就是本对象,则咱们看看本对象中的run()方法就是线程要运行的方法。 函数
@SuppressWarnings({ "rawtypes", "unchecked", "static-access" }) public void run(){ try { long startTime =0; while(true){ this.m_lockObject.addThread(); Object executeTask; while (true) { if(this.isStopSchedule == true){//中止队列调度 this.m_lockObject.realseThread(); this.m_lockObject.notifyOtherThread();//通知全部的休眠线程 this.threadList.remove(Thread.currentThread()); if(this.threadList.size()==0){ this.scheduleManager.unRegisterScheduleServer(); } return; } //加载调度任务 if(this.isMutilTask == false){ executeTask = this.getScheduleTaskId(); }else{ executeTask = this.getScheduleTaskIdMulti(); } if(executeTask == null){ break; } try {//运行相关的程序 startTime =ScheduleUtil.getCurrentTimeMillis(); if (this.isMutilTask == false) { if (((IScheduleTaskDealSingle) this.taskDealProcessor).execute(executeTask,scheduleManager.getTaskTypeRunningInfo().getOwnSign()) == true) { addSuccessNum(1, ScheduleUtil.getCurrentTimeMillis() - startTime, "com.taobao.pamirs.schedule.TBScheduleProcessorSleep.run"); } else { addFailNum(1, ScheduleUtil.getCurrentTimeMillis() - startTime, "com.taobao.pamirs.schedule.TBScheduleProcessorSleep.run"); } } else { if (((IScheduleTaskDealMulti) this.taskDealProcessor) .execute((Object[]) executeTask,scheduleManager.getTaskTypeRunningInfo().getOwnSign()) == true) { addSuccessNum(((Object[]) executeTask).length, ScheduleUtil .getCurrentTimeMillis() - startTime, "com.taobao.pamirs.schedule.TBScheduleProcessorSleep.run"); } else { addFailNum(((Object[]) executeTask).length, ScheduleUtil .getCurrentTimeMillis() - startTime, "com.taobao.pamirs.schedule.TBScheduleProcessorSleep.run"); } } }catch (Throwable ex) { if (this.isMutilTask == false) { addFailNum(1, ScheduleUtil.getCurrentTimeMillis() - startTime, "TBScheduleProcessor.run"); } else { addFailNum(((Object[]) executeTask).length, ScheduleUtil .getCurrentTimeMillis() - startTime, "TBScheduleProcessor.run"); } logger.warn("Task :" + executeTask + " 处理失败", ex); } } //当前队列中全部的任务都已经完成了。 if(logger.isTraceEnabled()){ logger.trace(Thread.currentThread().getName() +":当前运行线程数量:" +this.m_lockObject.count()); } if (this.m_lockObject.realseThreadButNotLast() == false) { int size = 0; Thread.currentThread().sleep(100); startTime = ScheduleUtil.getCurrentTimeMillis(); // 装载数据 size = this.loadScheduleData(); if (size > 0) { this.m_lockObject.notifyOtherThread(); } else { //判断当没有数据的是否,是否须要退出调度 if (this.isStopSchedule == false && this.scheduleManager.isContinueWhenData()== true ){ if(logger.isTraceEnabled()){ logger.trace("没有装载到数据,start sleep"); } this.isSleeping = true; Thread.currentThread().sleep(this.scheduleManager.getTaskTypeInfo().getSleepTimeNoData()); this.isSleeping = false; if(logger.isTraceEnabled()){ logger.trace("Sleep end"); } }else{ //没有数据,退出调度,唤醒全部沉睡线程 this.m_lockObject.notifyOtherThread(); } } this.m_lockObject.realseThread(); } else {// 将当前线程放置到等待队列中。直到有线程装载到了新的任务数据 if(logger.isTraceEnabled()){ logger.trace("不是最后一个线程,sleep"); } this.m_lockObject.waitCurrentThread(); } } } catch (Throwable e) { logger.error(e.getMessage(), e); } }
该方法有两重while(true)循环。 源码分析
1.跳出循环的点一个是当接受到中止调度的指令的时候,会跳出整个run方法。 性能
2.若线程没有加在到执行任务,则会中断内层while(true)循环。若不是最后一个线程,则线程会处于等待状态。使用的是Object.wait()方法。等待其它线程唤醒。 fetch
3.如果最后一个线程,则不会进入等待状态,而是执行方法loadScheduleData加载新的任务。若是加载到任务则唤醒其它线程开始工做。调用Object.notifyAll()方法唤醒其它线程。
ui
若是没有加载到数据,则本身也会睡眠一个周期,等待数据准备好。
加载任务的两个方法的源码分别以下。
public synchronized Object getScheduleTaskId() { if (this.taskList.size() > 0) return this.taskList.remove(0); // 按正序处理 return null; } public synchronized Object[] getScheduleTaskIdMulti() { if (this.taskList.size() == 0){ return null; } int size = taskList.size() > taskTypeInfo.getExecuteNumber() ? taskTypeInfo.getExecuteNumber() : taskList.size(); Object[] result = new Object[size]; for(int i=0;i<size;i++){ result[i] = this.taskList.remove(0); // 按正序处理 } return result; }
Not sleep模式:
当一个线程任务处理完毕,从任务池中取不到任务的时候,当即调用业务接口获取须要处理的任务,放入任务池中。
构造对象TBScheduleProcessorNotSleep的方法与睡眠处理器实现相似,再也不列出代码。
初始化和建立线程、启动线程的代码也是相似的,咱们就再也不细看,咱们只看不同的地方是run()方法。
/** * 运行函数 */ @SuppressWarnings("unchecked") public void run() { long startTime = 0; long sequence = 0; Object executeTask = null; while (true) { try { if (this.isStopSchedule == true) { // 中止队列调度 this.threadList.remove(Thread.currentThread());///threadList中的线程是在startThread中添加的,new完线程后启动以前加入 if(this.threadList.size()==0){ this.scheduleManager.unRegisterScheduleServer(); } return; } // 加载调度任务 if (this.isMutilTask == false) { ///从上次selectTask的结果中取数据,装任务的taskList是线程安全的,且取的同时删掉,因而同一个实例内部的线程不会取到相同数据. executeTask = this.getScheduleTaskId(); } else { executeTask = this.getScheduleTaskIdMulti(); } ///若是已加载数据处理完,则再加载.NoSleep类型任务的特色 if (executeTask == null ) { this.loadScheduleData(); continue; } try { // 运行相关的程序 this.runningTaskList.add(executeTask); startTime = ScheduleUtil.getCurrentTimeMillis(); sequence = sequence + 1;///没用上 if (this.isMutilTask == false) { if (((IScheduleTaskDealSingle<Object>) this.taskDealProcessor).execute(executeTask,scheduleManager.getTaskTypeRunningInfo().getOwnSign()) == true) { addSuccessNum(1, ScheduleUtil.getCurrentTimeMillis() - startTime, "com.taobao.pamirs.schedule.TBScheduleProcessorNotSleep.run"); } else { addFailNum(1, ScheduleUtil.getCurrentTimeMillis() - startTime, "com.taobao.pamirs.schedule.TBScheduleProcessorNotSleep.run"); } } else { if (((IScheduleTaskDealMulti<Object>) this.taskDealProcessor) .execute((Object[]) executeTask,scheduleManager.getTaskTypeRunningInfo().getOwnSign()) == true) { addSuccessNum(((Object[]) executeTask).length, ScheduleUtil .getCurrentTimeMillis() - startTime, "com.taobao.pamirs.schedule.TBScheduleProcessorNotSleep.run"); } else { addFailNum(((Object[]) executeTask).length, ScheduleUtil .getCurrentTimeMillis() - startTime, "com.taobao.pamirs.schedule.TBScheduleProcessorNotSleep.run"); } } } catch (Throwable ex) { if (this.isMutilTask == false) { addFailNum(1, ScheduleUtil.getCurrentTimeMillis() - startTime, "TBScheduleProcessor.run"); } else { addFailNum(((Object[]) executeTask).length, ScheduleUtil .getCurrentTimeMillis() - startTime, "TBScheduleProcessor.run"); } logger.error("Task :" + executeTask + " 处理失败", ex); } finally { this.runningTaskList.remove(executeTask); } } catch (Throwable e) { throw new RuntimeException(e); //log.error(e.getMessage(), e); } } }
/** * 获取单个任务,注意lock是必须, * 不然在maybeRepeatTaskList的数据处理上会出现冲突 * @return */ public T getScheduleTaskId() { lockFetchID.lock(); try { T result = null; while (true) { if (this.taskList.size() > 0) { result = this.taskList.remove(0); // 按正序处理 } else { return null; } if (this.isDealing(result) == false) {///检查是否是在maybeRepeatTaskList里面的,这句话在sleep方式里面没有,所以任务的比较器只有在NotSleep模式下须要用到 return result; } } } finally { lockFetchID.unlock(); } }
@SuppressWarnings("unchecked") protected boolean isDealing(T aTask) { if (this.maybeRepeatTaskList.size() == 0) { return false; } T[] tmpList = (T[]) this.maybeRepeatTaskList.toArray(); for (int i = 0; i < tmpList.length; i++) { if(this.taskComparator.compare(aTask, tmpList[i]) == 0){///在本身定义的任务类中定义getComparator就是干这个用的 this.maybeRepeatTaskList.remove(tmpList[i]); return true; } } return false; }
判断任务是否在处理中的方法,是直接与maybeRepeatTaskList集合中的对象进行对比,而且调用taskComparator这个实现的方法进行对比,所以该接口的正确性,直接影响着去重的效果。