1、下载TBSchedule 源码java
http://code.taobao.org/svn/tbschedule/sql
2、编译TBSchedule源码造成jar包数据库
mvn package/直接mvn deploye/install 一步完成服务器
3、安装到本地的maven仓库session
mvn deploy/installjvm
4、在项目pom配置文件中引用这个依赖文件maven
<dependency>ide
<groupId>com.taobao.pamirs.schedule</groupId>svn
<artifactId>tbschedule</artifactId>fetch
<version>3.3.3.2</version>
</dependency>
5、配置TBSchedule依赖的zookeeper配置,TBSchedule的调度依赖于zookeeper的节点配置和心跳连接。
# zooker service address
schedule.zookeeper.address=120.25.87.176:2181
# root path
schedule.root.path=/bbb/dd
# session timeout
schedule.session.timeout=60000
# userName
schedule.zookeeper.username=ScheduleAdmin
# password
schedule.zookeeper.password=password
<context:property-placeholder location="classpath:schedule.properties" ignore-resource-not-found="true" ignore-unresolvable="true"/>
<bean id="scheduleManagerFactory" class="com.taobao.pamirs.schedule.strategy.TBScheduleManagerFactory"
init-method="init">
<property name="zkConfig">
<map>
<entry key="zkConnectString" value="${schedule.zookeeper.address}" />
<entry key="rootPath" value="${schedule.root.path}" />//在控制台链接zookeeper的路径要和该配置的路径一致,因为配置时默认要检查父节点是否为空,因此最好先检查该路径是否存在。
<entry key="zkSessionTimeout" value="${schedule.session.timeout}" />
<entry key="userName" value="${schedule.zookeeper.username} 必须和控制台的用户名同样" />
<entry key="password" value="${schedule.zookeeper.password} 必须和" />
<entry key="isCheckParentPath" value="true" />
</map>
</property>
</bean>
6、配置任务项和任务策略,将任务分片,查询任务
六-1、经过代码实现任务的分配
private Logger log = LoggerFactory
.getLogger(getClass());
TBScheduleManagerFactory scheduleManagerFactory;
/**
* 心跳链接时间
*/
private int heartBeatRate = 5*1000;
private int judgeDeadInterval = 1*60*1000;
private String taskParameter="";
private String strategyTaskParameter="";
/**
* jvm最大单线程数量
*/
private int numOfSingleServer = 2;
/**
* 最大线程组数量 总 最大线程数量 = jvm最大单线程数量 * 最大的线程组总量
* 将任务列表分红几个队列来处理
*/
private int assignNum = 2;
private String[] iPLists={"127.0.0.1"};
private String[] taskItems={"0","1","2","3","4","5","6","7","8","9"};
private String[] baseTaskTypeNames;
private String[] dealBeanNames;
/**
* 开始时间
*/
private String permitRunStartTime;
/**
* 结束时间
*/
private String permitRunEndTime;
/**
* 批处理时 每次处理任务的数量
*/
private int executeNumber = 1;
/**
* 处理完一批数据后的休息时间
*/
private int sleepTimeInterval = 0;
/**
* 采集不到数据的休眠时间
*/
private int sleepTimeNoData = 500;
/**
* 每次采集任务的数量
*/
private int fetchDataNumber = 500;
@Autowired
public void setScheduleManagerFactory(
TBScheduleManagerFactory tbScheduleManagerFactory) {
this.scheduleManagerFactory = tbScheduleManagerFactory;
}
public void initialConfigData() throws Exception {
}
@Override
public void afterPropertiesSet() throws Exception {
if(baseTaskTypeNames.length != dealBeanNames.length) {
throw new RuntimeException("task definition error, baseTaskTypeNames length not equals dealBeanNames length");
}
String baseTaskTypeName;
String dealBeanName;
for (int i = 0; i < baseTaskTypeNames.length; i++) {
baseTaskTypeName = baseTaskTypeNames[i];
dealBeanName= dealBeanNames[i];
configTasks(baseTaskTypeName, dealBeanName);
}
}
private void configTasks(String baseTaskTypeName, String dealBeanName)
throws Exception, InterruptedException {
while(this.scheduleManagerFactory.isZookeeperInitialSucess() == false){
Thread.sleep(1000);
}
scheduleManagerFactory.stopServer(null);
Thread.sleep(1000);
try {
this.scheduleManagerFactory.getScheduleDataManager()
.deleteTaskType(baseTaskTypeName);
} catch (Exception e) {
}
ScheduleTaskType baseTaskType = new ScheduleTaskType();
baseTaskType.setBaseTaskType(baseTaskTypeName);
baseTaskType.setDealBeanName(dealBeanName);
baseTaskType.setHeartBeatRate(heartBeatRate);
baseTaskType.setJudgeDeadInterval(judgeDeadInterval);
baseTaskType.setTaskParameter(taskParameter);
baseTaskType.setTaskItems(taskItems);
baseTaskType.setPermitRunStartTime(permitRunStartTime);
baseTaskType.setPermitRunEndTime(permitRunEndTime);
baseTaskType.setExecuteNumber(executeNumber);
baseTaskType.setSleepTimeInterval(sleepTimeInterval);
baseTaskType.setSleepTimeNoData(sleepTimeNoData);
baseTaskType.setFetchDataNumber(fetchDataNumber);
this.scheduleManagerFactory.getScheduleDataManager()
.createBaseTaskType(baseTaskType);
String taskName = baseTaskTypeName + "$TEST";
String strategyName = baseTaskTypeName +"-Strategy";
try {
this.scheduleManagerFactory.getScheduleStrategyManager()
.deleteMachineStrategy(strategyName,true);
} catch (Exception e) {
e.printStackTrace();
}
ScheduleStrategy strategy = new ScheduleStrategy();
strategy.setStrategyName(strategyName);
strategy.setKind(ScheduleStrategy.Kind.Schedule);
strategy.setTaskName(taskName);
strategy.setTaskParameter(strategyTaskParameter);
strategy.setNumOfSingleServer(numOfSingleServer);
strategy.setAssignNum(assignNum);
strategy.setIPList(iPLists);
this.scheduleManagerFactory.getScheduleStrategyManager()
.createScheduleStrategy(strategy);
log.info("建立调度任务成功" + strategy.toString());
}
六-2、经过TBSchedule自带的控制台来实现任务的配置
7、任务的理解
一、同一个jvm中,不一样线程之间如何防止任务被重复执行?一个scheduleServer的内部线程间如何进行任务分片?
答复:一、数据分片是在不一样的jvm,获知同一个jvm中不一样的线程组间起做用。在同一个线程组内的10个线程,是经过一个同步的任务队列来实现的。二、每一个线程从队列中取任务执行,若是没有任务了,则由一个线程负责调用selectTasks方法再获取一批新的任务。
主要是设置休眠时间:即selectTasks方法返回列表的size为0后,进入休眠。休眠完成以后从新执行该定时任务
二、任务项设置的意义和selectTasks方法的参数含义
答复:一、 任务项(0,1,2,3,4,5,6,7,8,9)就是任务分片的策略。这个配置就是把数据分红10片。能够表示 ID的最后一位,也能够是一个独立的字段。根据你的业务来定。
二、 若是只有1组线程,则全部的任务片都分配给他。这时selectTasks方法的参数:taskItemNum =10, queryCondition由10个元素,分别对应0,1,2,3,4,5,6,7,8,9。
a) 若是只有2组线程,则任务片被分红两份。这时
b) 一个线程组的selectTasks方法的参数:taskItemNum =10, queryCondition有5个元素(0,2,4,6,8)
三、 另一个线程组的selectTasks方法的参数:taskItemNum =10, queryCondition有5个元素(1,3,5,7,9)
四、 若是有10个线程组。则每组线程只会获取到1个任务片。这时selectTasks方法的参数:taskItemNum =10, queryCondition只有一个元素,对应0到9中的一个。
一、 执行期间和时间的修改功能 a) 在建立任务和修改任务的时候,有两个属性(执行开始时间,执行结算时间)用于控制任务的执行时间。 b) 时间格式遵循标准的cron格式 http://dogstar.javaeye.com/blog/116130 还加强了原来不支持的倒数第几天的能力。 c) 当时间到底开始时间的时候,就开始执行任务,到达结束时间则终止调度(不论是否全部的任务都处理完)。若是没有设置执行结束时间。则一直运行,直到selectTasks返回的记录数为0,就终止执行。等待下个开始运行时间在启动。 d) 若是要动态修改任务的执行时间区间,则先 点击“暂停”按钮,等全部的服务器都中止完毕(大概须要几秒时间)。当再次单击任务,出现以下情形表示中止完毕。而后修改执行开始时间,执行结算时间。在恢复任务调度,就能够实现调度时间的修改 二、 任务处理的问题 a) Schedule主要是提供任务调度的分配管理。每个任务是否执行成功,是经过业务方的bean来实现的。 b) 你需求的例子,我理解的解决方案以下: i. 你从云梯拉下来100万数据放到保险应用的数据库中。这个表中有两个关键字段USER_ID和STS(状态 0-未发送,1-已发送) ii. 在bean的selectTasks方法的查询sql中除了根据任务进行分片外,还须要增长状态条件。例如 USER_ID % 10 in( ?,?,?) AND sts =0 iii. 在bean的execute方法中,在发送完消息后,你还须要 修改数据状态 update table STS =1 where USER_ID =? 。这样下次就不会取到这条数据了。 iv. 这样就能够保障机器从新启动后,也不会出现问题。你能够参考DBDemoSingle.java的实现模式。你使用的接口应该是IScheduleTaskDealSingle。 若是旺旺的接口支持批量发送消息的时候,你才须要使用IScheduleTaskDealMulti接口。