TBSchedule是来自淘宝的分布式调度开源框架,基于Zookeeper纯Java实现,其目的是让一种批量任务或者不断变化的任务,可以被动态的分配到多个主机的JVM中的不一样线程组中并行执行。全部的任务可以被不重复,不遗漏的快速处理。这种框架任务的分配经过分片实现了不重复调度,又经过架构中Leader的选择,存活的自我保证,完成了可用性和伸缩性的保障。
TBSchedule源码地址:http://code.taobao.org/p/tbschedule/src/
1.安装zookeeperhtml
(1)下载zookeeperjava
http://zookeeper.apache.org/releases.htmlnode
下载3.4.11版本:web
http://mirrors.hust.edu.cn/apache/zookeeper/zookeeper-3.4.11/zookeeper-3.4.11.tar.gz算法
(2)解压至c:/prog/zookeeper/zookeeper-3.4.11spring
复制conf下的zoo_sample.cfg为zoo.cfg数据库
修改dataDir为:apache
dataDir=/prog/zookeeper/data数组
tickTime单位为毫秒,为心跳间隔和最小的心跳超时间隔浏览器
clientPort是监听客户端链接的端口,默认为2181
(3)建立目录:c:/prog/zookeeper/data
2.启动zookeeper
运行bin/zkServer.cmd
若是在Linux下,则执行:
[root@192.168.1.5]$ ./zkServer start
3.下载TBSchedule
采用svn来Checkout TBSchedule
svn地址:http://code.taobao.org/svn/tbschedule/
4.在Eclipse中导入项目:
右键工程区域(Package Explorer)->Import...->Maven-Existing Maven Projects
注意:TBSchedule编码为GBK,但引用TBSchedule的工程编码为UTF-8时,此处也要将TBSchedule工程的编码设置为UTF-8。
5.安装Tomcat
(1)下载Tomcat
地址:https://tomcat.apache.org/download-80.cgi#8.5.27
(2)解压Tomcat 8.5至c:\prog\tomcat\apache-tomcat-8.5.11
6.配置TBSchedule控制台
(1)将TBSchedule工程中的console\ScheduleConsole.war拷贝至tomcat/webapps中
(2)启动tomcat
(3)浏览器中打开:
http://localhost:8080/ScheduleConsole/schedule/index.jsp?manager=true
点击保存会提示:
错误信息:Zookeeper connecting ......localhost:2181
如配置正确则能够忽略上述提示,直接进入“管理主页...”。
7.查看zookeeper中节点
运行zookeeper下的bin/zkClient.cmd
输入ls /app-schedule/demo,显示:
[strategy, baseTaskType, factory]
说明已经建立znode成功。
查看TBSchedule控制台中的“Zookeeper数据”,也能看到相同数据。
8.在项目中使用TBSchedule
Eclipse中新建一个maven工程tbsdemo
GroupId:com.jf
Artifact Id:tbsdemo
9.在pom.xml中引入Spring、TBSchedule、Zookeeper
pom.xml内容为:
<project xmlns=
"http://maven.apache.org/POM/4.0.0"
xmlns:xsi=
"http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation=
"http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"
>
<modelVersion>
4.0
.
0
</modelVersion>
<groupId>com.jf</groupId>
<artifactId>tbsdemo</artifactId>
<version>
0.0
.
1
-SNAPSHOT</version>
<packaging>jar</packaging>
<name>tbsdemo</name>
<url>http:
//maven.apache.org</url>
<properties>
<project.build.sourceEncoding>UTF-
8
</project.build.sourceEncoding>
<!-- spring版本号 -->
<spring.version>
4.0
.
5
.RELEASE</spring.version>
<!-- mybatis版本号 -->
<mybatis.version>
3.3
.
0
</mybatis.version>
<!-- log4j日志文件管理包版本 -->
<slf4j.version>
1.7
.
7
</slf4j.version>
<log4j.version>
1.2
.
17
</log4j.version>
</properties>
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>
4.11
</version>
<scope>test</scope>
</dependency>
<!-- spring核心包 -->
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-core</artifactId>
<version>${spring.version}</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context-support</artifactId>
<version>${spring.version}</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-test</artifactId>
<version>${spring.version}</version>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>${log4j.version}</version>
</dependency>
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>
3.4
.
11
</version>
</dependency>
<dependency>
<groupId>com.taobao.pamirs.schedule</groupId>
<artifactId>tbschedule</artifactId>
<version>
3.3
.
3.2
</version>
</dependency>
</dependencies>
</project>
|
10.在src/main/resources下建立applicationContext.xml,输入:
<?xml version=
"1.0"
encoding=
"UTF-8"
?>
<beans xmlns=
"http://www.springframework.org/schema/beans"
xmlns:xsi=
"http://www.w3.org/2001/XMLSchema-instance"
xmlns:p=
"http://www.springframework.org/schema/p"
xsi:schemaLocation="http:
//www.springframework.org/schema/beans
http:
//www.springframework.org/schema/beans/spring-beans-4.0.xsd
http:
//www.springframework.org/schema/context
http:
//www.springframework.org/schema/context/spring-context-4.0.xsd">
<context:component-scan base-
package
=
"com.jf"
/>
<!-- 引入配置文件 -->
<bean id=
"propertyConfigurer"
class
=
"org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"
>
<property name=
"locations"
>
<list>
<value>classpath:tbschedule.properties</value>
</list>
</property>
</bean>
<bean id=
"scheduleManagerFactory"
class
=
"com.taobao.pamirs.schedule.strategy.TBScheduleManagerFactory"
init-method=
"init"
>
<property name=
"zkConfig"
>
<map>
<entry key=
"zkConnectString"
value=
"${schedule.zookeeper.address}"
/>
<entry key=
"rootPath"
value=
"${schedule.root.catalog}"
/>
<entry key=
"zkSessionTimeout"
value=
"${schedule.timeout}"
/>
<entry key=
"userName"
value=
"${schedule.username}"
/>
<entry key=
"password"
value=
"${schedule.password}"
/>
<entry key=
"isCheckParentPath"
value=
"true"
/>
</map>
</property>
</bean>
</beans>
|
11.建立TBSchedule配置文件
在src/main/resources/中建立tbschedule.propertie
输入:
#注册中心地址
schedule.zookeeper.address=localhost:
2181
#定时任务根目录,任意指定,调度控制台配置时对应
schedule.root.catalog=/app-schedule/demo
#帐户,任意指定,调度控制台配置时对应
schedule.username=admin
#密码,任意指定,调度控制台配置时对应
schedule.password=password
#超时配置
schedule.timeout=
60000
|
注意schedule.username、schedule.password要与TBSchedule控制台中设置的一致。
12.建立任务数据类TaskModel:
package
com.jf.tbsdemo.pojo;
public
class
TaskModel {
private
long
id;
private
String taskInfo;
public
TaskModel(
long
id, String taskInfo) {
this
.id = id;
this
.taskInfo = taskInfo;
}
public
long
getId() {
return
id;
}
public
void
setId(
long
id) {
this
.id = id;
}
public
String getTaskInfo() {
return
taskInfo;
}
public
void
setTaskInfo(String taskInfo) {
this
.taskInfo = taskInfo;
}
}
|
13.建立任务处理类IScheduleTaskDealSingleTest:
注意:任务处理分单任务和多任务(批处理),分别实现IScheduleTaskDealSingle<T>、IScheduleTaskDealMulti<T>接口,前者的execute()方法参数只有一个任务T,然后者的execute()方法参数为List<T>,本文使用单任务模式。
package
com.jf.tbsdemo;
import
java.util.ArrayList;
import
java.util.Comparator;
import
java.util.Date;
import
java.util.List;
import
org.apache.log4j.Logger;
import
org.springframework.stereotype.Component;
import
com.jf.tbsdemo.pojo.TaskModel;
import
com.taobao.pamirs.schedule.IScheduleTaskDealSingle;
import
com.taobao.pamirs.schedule.TaskItemDefine;
public
class
IScheduleTaskDealSingleTest
implements
IScheduleTaskDealSingle<TaskModel> {
private
static
final
Logger logger = Logger.getLogger(IScheduleTaskDealSingleTest.
class
);
public
Comparator<TaskModel> getComparator() {
return
null
;
}
public
List<TaskModel> selectTasks(String taskParameter, String ownSign,
int
taskQueueNum,
List<TaskItemDefine> taskItemList,
int
eachFetchDataNum)
throws
Exception {
logger.info(
"IScheduleTaskDealSingleTest选择任务列表开始.........."
);
List<TaskModel> models =
new
ArrayList<TaskModel>();
models.add(
new
TaskModel(
1
,
"task1"
));
models.add(
new
TaskModel(
2
,
"task2"
));
return
models;
}
public
boolean
execute(TaskModel model, String ownSign)
throws
Exception {
logger.info(
"IScheduleTaskDealSingleTest执行开始.........."
+
new
Date());
logger.info(
"任务"
+ model.getId() +
",内容:"
+ model.getTaskInfo());
return
true
;
}
}
|
其中,selectTasks()方法负责取得要处理的任务信息,execute()方法为处理任务的方法。selectTasks()方法能够理解为生产者,execute()方法能够理解为消费者。
14.建立主程序类TaskCenter:
package
com.jf.tbsdemo;
import
org.apache.log4j.Logger;
import
org.springframework.context.ApplicationContext;
import
org.springframework.context.support.FileSystemXmlApplicationContext;
public
class
TaskCenter {
private
static
final
Logger logger = Logger.getLogger(TaskCenter.
class
);
public
static
void
main(String[] args)
throws
Exception {
// 初始化Spring
ApplicationContext ctx =
new
FileSystemXmlApplicationContext(
"classpath:applicationContext.xml"
);
logger.info(
"---------------task start------------------"
);
}
}
|
15.在Eclipse中运行主程序类TaskCenter
16.在TBSchedule中建立任务:
(1)进入TBSchedule的控制台->任务管理
点击“建立新任务…”
(2)配置任务属性:
NOTSLEEP模式下线程在任务池中取不到任务时,将当即调用业务接口获取待处理的任务。
SLEEP模式较为简单,由于取任务的线程同一时间只有一个,不易发生冲突,效率也会较低。NOTSLEEP模式开销较大,也要防止发生重复获取相同任务。
0,1,2,3,4,5,6,7,8,9
17.在TBSchedule中建立调度策略:
(1)进入TBSchedule的控制台->调度策略
点击“建立新策略…”
(2)填写策略属性:
注意任务名称要与新建的任务名称一致。
(3)点击建立,将当即启动调度任务
另外,除了在控制台中配置调度策略、任务,还能够经过经过代码、Spring配置来设置任务调度参数,推荐采用Spring配置方式。
18.代码方式
建立类TaskCenter:
package
com.jf.tbsdemo;
import
java.util.Properties;
import
javax.annotation.Resource;
import
org.apache.log4j.Logger;
import
org.springframework.context.ApplicationContext;
import
org.springframework.context.support.FileSystemXmlApplicationContext;
import
com.taobao.pamirs.schedule.strategy.ScheduleStrategy;
import
com.taobao.pamirs.schedule.strategy.TBScheduleManagerFactory;
import
com.taobao.pamirs.schedule.taskmanager.ScheduleTaskType;
public
class
TaskCenter {
private
static
final
Logger logger = Logger.getLogger(TaskCenter.
class
);
// 初始化调度工厂
@Resource
TBScheduleManagerFactory scheduleManagerFactory =
new
TBScheduleManagerFactory();
private
void
startTask() {
// 初始化Spring
ApplicationContext ctx =
new
FileSystemXmlApplicationContext(
"classpath:applicationContext.xml"
);
Properties p =
new
Properties();
p.put(
"zkConnectString"
,
"localhost:2181"
);
p.put(
"rootPath"
,
"/app-schedule/demo"
);
p.put(
"zkSessionTimeout"
,
"60000"
);
p.put(
"userName"
,
"admin"
);
p.put(
"password"
,
"password"
);
p.put(
"isCheckParentPath"
,
"true"
);
scheduleManagerFactory.setApplicationContext(ctx);
try
{
scheduleManagerFactory.init(p);
// 建立任务调度任务的基本信息
String baseTaskTypeName =
"DemoTask"
;
ScheduleTaskType baseTaskType =
new
ScheduleTaskType();
baseTaskType.setBaseTaskType(baseTaskTypeName);
baseTaskType.setDealBeanName(
"demoTaskBean"
);
baseTaskType.setHeartBeatRate(
10000
);
baseTaskType.setJudgeDeadInterval(
100000
);
baseTaskType.setTaskParameter(
"AREA=BJ,YEAR>30"
);
baseTaskType.setTaskItems(ScheduleTaskType
.splitTaskItem(
"0:{TYPE=A,KIND=1},1:{TYPE=A,KIND=2},2:{TYPE=A,KIND=3},3:{TYPE=A,KIND=4},"
+
"4:{TYPE=A,KIND=5},5:{TYPE=A,KIND=6},6:{TYPE=A,KIND=7},7:{TYPE=A,KIND=8},"
+
"8:{TYPE=A,KIND=9},9:{TYPE=A,KIND=10}"
));
baseTaskType.setFetchDataNumber(
500
);
baseTaskType.setThreadNumber(
5
);
scheduleManagerFactory.getScheduleDataManager().createBaseTaskType(baseTaskType);
logger.info(
"建立调度任务成功:"
+ baseTaskType.toString());
// 建立任务的调度策略
String taskName = baseTaskTypeName;
String strategyName = taskName +
"-Strategy"
;
try
{
scheduleManagerFactory.getScheduleStrategyManager().deleteMachineStrategy(strategyName,
true
);
}
catch
(Exception e) {
e.printStackTrace();
}
ScheduleStrategy strategy =
new
ScheduleStrategy();
strategy.setStrategyName(strategyName);
strategy.setKind(ScheduleStrategy.Kind.Schedule);
strategy.setTaskName(taskName);
strategy.setTaskParameter(
"china"
);
strategy.setNumOfSingleServer(
1
);
strategy.setAssignNum(
10
);
strategy.setIPList(
"127.0.0.1"
.split(
","
));
scheduleManagerFactory.getScheduleStrategyManager().createScheduleStrategy(strategy);
logger.info(
"建立调度策略成功:"
+ strategy.toString());
logger.info(
"---------------task start------------------"
);
}
catch
(Exception e) {
logger.error(
"出现异常"
, e);
}
}
public
static
void
main(String[] args)
throws
Exception {
TaskCenter taskCenter =
new
TaskCenter();
taskCenter.startTask();
}
}
|
19.Spring配置文件方式
(1)增长类AbstractBaseScheduleTask:
package
com.jf.tbsdemo;
import
com.taobao.pamirs.schedule.IScheduleTaskDealSingle;
import
com.taobao.pamirs.schedule.strategy.ScheduleStrategy;
import
com.taobao.pamirs.schedule.taskmanager.ScheduleTaskType;
public
abstract
class
AbstractBaseScheduleTask<T>
implements
IScheduleTaskDealSingle<T> {
/**
* 调度任务的配置
*/
private
ScheduleTaskType scheduleTaskType;
/**
* 调度策略的配置
*/
private
ScheduleStrategy scheduleStrategy;
public
ScheduleTaskType getScheduleTaskType() {
return
scheduleTaskType;
}
public
void
setScheduleTaskType(ScheduleTaskType scheduleTaskType) {
this
.scheduleTaskType = scheduleTaskType;
}
public
ScheduleStrategy getScheduleStrategy() {
return
scheduleStrategy;
}
public
void
setScheduleStrategy(ScheduleStrategy scheduleStrategy) {
this
.scheduleStrategy = scheduleStrategy;
}
}
|
(2)修改IScheduleTaskDealSingleTest:
类声明改成:
public class IScheduleTaskDealSingleTest extends AbstractBaseScheduleTask<TaskModel> {
(3)在applicationContext.xml中对声明IScheduleTaskDealSingleTest的Bean并注入参数,内容为:
<?xml version=
"1.0"
encoding=
"UTF-8"
?>
<beans xmlns=
"http://www.springframework.org/schema/beans"
xsi:schemaLocation="http:
//www.springframework.org/schema/beans
http:
//www.springframework.org/schema/beans/spring-beans-4.0.xsd
http:
//www.springframework.org/schema/context
http:
//www.springframework.org/schema/context/spring-context-4.0.xsd">
<context:component-scan base-
package
=
"com.jf"
/>
<!-- 引入配置文件 -->
<bean id=
"propertyConfigurer"
class
=
"org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"
>
<property name=
"locations"
>
<list>
<value>classpath:tbschedule.properties</value>
</list>
</property>
</bean>
<!--tbschedule管理器初始化(配置zookeeper,注册调度任务和调度策略)-->
<bean id=
"systemTBScheduleManagerFactory"
class
=
"com.jf.tbsdemo.SystemTBScheduleManagerFactory"
>
<property name=
"zkConfig"
>
<map>
<entry key=
"zkConnectString"
value=
"${schedule.zookeeper.address}"
/>
<entry key=
"rootPath"
value=
"${schedule.root.catalog}"
/>
<entry key=
"zkSessionTimeout"
value=
"${schedule.timeout}"
/>
<entry key=
"userName"
value=
"${schedule.username}"
/>
<entry key=
"password"
value=
"${schedule.password}"
/>
<entry key=
"isCheckParentPath"
value=
"true"
/>
</map>
</property>
</bean>
<bean name=
"scheduleTaskType"
class
=
"com.taobao.pamirs.schedule.taskmanager.ScheduleTaskType"
>
<!-- 心跳频率(毫秒) -->
<property name=
"heartBeatRate"
value=
"5000"
/>
<!-- 假定服务死亡间隔(毫秒) -->
<property name=
"judgeDeadInterval"
value=
"60000"
/>
<!-- 处理模式 -->
<property name=
"processorType"
value=
"SLEEP"
/>
<!-- 线程数 -->
<property name=
"threadNumber"
value=
"5"
/>
<!--容许执行的开始时间-->
<property name=
"permitRunStartTime"
value=
""
/>
<!--容许执行的结束时间-->
<property name=
"permitRunEndTime"
value=
""
/>
<!--当没有数据的时候,休眠的时间-->
<property name=
"sleepTimeNoData"
value=
"3000"
/>
<!--在每次数据处理完后休眠的时间-->
<property name=
"sleepTimeInterval"
value=
"1000"
/>
<!--每次获取数据的数量-->
<property name=
"fetchDataNumber"
value=
"10"
/>
<!--任务项数组-->
<property name=
"taskItems"
>
<list>
<value>
0
:{TYPE=A,KIND=
1
}</value>
<value>
1
:{TYPE=B,KIND=
2
}</value>
<value>
2
:{TYPE=C,KIND=
3
}</value>
</list>
</property>
</bean>
<bean name=
"scheduleStrategy"
class
=
"com.taobao.pamirs.schedule.strategy.ScheduleStrategy"
>
<!--最大线程组数量-->
<property name=
"assignNum"
value=
"9"
/>
<!--单个机器(JVM)的线程组数量-->
<property name=
"numOfSingleServer"
value=
"3"
/>
<!--策略运行的机器(JVM)IP-->
<property name=
"IPList"
>
<list>
<value>
127.0
.
0.1
</value>
</list>
</property>
</bean>
<!--任务simpleTask-->
<bean id=
"simpleTask"
class
=
"com.jf.tbsdemo.IScheduleTaskDealSingleTest"
>
<property name=
"scheduleTaskType"
ref=
"scheduleTaskType"
/>
<property name=
"scheduleStrategy"
ref=
"scheduleStrategy"
/>
</bean>
</beans>
|
(4)打开控制台,删除全部已有的任务、调度策略,再启动TaskCenter,刷新页面则可看到当前出现了正在运行的任务和调度策略。
(5)也能够在控制台中修改策略,但重启TaskCenter以后会恢复Spring中的配置信息。
20.数据分片方法
为了不TBSchedule管理的多线程重复处理数据,须要采用分片,实现方法以下:
(1)在selectTasks()方法中实现分片获取待处理数据
(2) selectTasks()方法的taskItemList参数为当前线程分配到的可处理任务分片信息,所有任务分片信息由配置文件中的taskItem定义,每项任务信息为TaskItemDefine类型,其中taskItemId标明了分片ID(即0,1,2),parameter为自定义参数(即{TYPE=A,KIND=1},{TYPE=B,KIND=2},{TYPE=C,KIND=3})。
(3)根据上面算出的分片ID来取得相应的待处理任务,例如selectTasks()方法从数据库中获取待处理的交易请求记录,能够将记录的主键或者其余字段HashCode值的余数做为分片ID,在selectTasks()方法中只获取与taskItemList中指定分片ID相同的任务,避免不一样线程重复获取同一任务。
(4)在系统运行过程当中,线程数量会有所变化,所以要在每一个selectTasks()方法执行开始先获取taskItemList。
(5) 每次执行selectTasks()方法取得记录条数不要超过eachFetchDataNum
(6)典型的分片代码实现:
/**
* 根据条件,查询当前调度服务器可处理的任务
* @param taskParameter 任务的自定义参数
* @param ownSign 当前环境名称
* @param taskItemNum 当前任务类型的任务队列数量
* @param taskItemList 当前调度服务器,分配到的可处理队列
* @param eachFetchDataNum 每次获取数据的数量
* @return
* @throws Exception
*/
public
List<Date> selectTasks(String taskParameter, String ownSign,
int
taskItemNum, List<TaskItemDefine> taskItemList,
int
eachFetchDataNum)
throws
Exception {
List<Date> dateList =
new
ArrayList<>();
List<Long> taskIdList =
new
ArrayList<>();
for
(TaskItemDefine t : taskItemList){
//肯定当前任务处理器需处理的任务项id
taskIdList.add(Long.valueOf(t.getTaskItemId()));
}
for
(
int
i=
0
;i<eachFetchDataNum;i++){
// 添加最多指定数量的待处理数据
Date date =
new
Date();
//生成待处理数据
Long remainder = date.getTime() % taskItemNum ;
if
(taskIdList.contains(remainder)){
//根据数据取模,判断当前待处理数据,是否应由当前任务处理器处理
dateList.add(date);
}
TimeUnit.SECONDS.sleep(
1
);
}
return
dateList;
//返回当前任务处理器须要处理的数据
}
|
21.参数说明
(1)zookeeper参数
zkConnectString:zookeeper注册中心地址
rootPath:定时任务根目录,任意指定,调度控制台配置时对应
zkSessionTimeout:超时时间
userName:帐户,任意指定,调度控制台配置时对应
password:密码,任意指定,调度控制台配置时对应
isCheckParentPath:设置为true会检查上级目录是否已经被用做TBSchedule调度,若是是则启动任务失败。
(2)任务参数:
heartBeatRate:心跳频率(毫秒)
judgeDeadInterval:假定服务死亡间隔(毫秒)
sleepTimeNoData: 当没有数据的时候,休眠的时间
sleepTimeInterval:在每次数据处理完后休眠的时间
processorType:处理模式,可为SLEEP或NOTSLEEP。
permitRunStartTime:执行开始时间若是为空则不定时,直接执行。
permitRunEndTime:执行结束时间,与执行开始时间之间的时间才能够执行任务。
taskItems: 任务项数组,例如:0:{TYPE=A,KIND=1},1:{TYPE=B,KIND=2},2:{TYPE=C,KIND=3}
在调度过程当中,某线程分得了获取数据的任务,假设获取第1项任务,则在selectTasks()方法的taskItemList参数中包含第1项任务的信息,TaskItemDefine类型,包含:taskItemId、parameter成员变量,分别为:一、{TYPE=B,KIND=2}。可根据该信息取得相应的数据。
fetchDataNumber:selectTasks()方法每次获取数据的数量
executeNumber:每次执行数量,即execute()方法每次取得的任务数量,只在bean实现IScheduleTaskDealMulti才生效。
threadNumber:每一个线程组中的线程数
maxTaskItemsOfOneThreadGroup:每一组线程能分配的最大任务数量,避免在随着机器的减小把正常的服务器压死,0或者空表示不限制
taskParameter:任务的自定义参数,可做为selectTasks中的参数传入。
(3)调度策略参数:
strategyName:策略名称,必须填写,不能有中文和特殊字符。
kind:任务类型,Schedule,Java,Bean 大小写敏感。
taskName:要配置调度策略的任务名称,与这一任务配置的名称要一致。
taskParameter:任务参数,逗号分隔的Key-Value。对任务类型为Java、Bean的有效,对任务类型为Schedule的无效,须要经过任务管理来配置。
assignNum:最大线程组数量,是全部机器(JVM)总共运行的线程组的最大数量。
numOfSingleServer单个机器(JVM)的线程组数量,若是是0,则表示无限制。
IPList:策略运行的机器(JVM)IP列表,127.0.0.1或者localhost会在全部机器上运行。
当在控制台点击中止任务的按钮时。会将任务池中未处理的任务清除,而中止前的在处理的任务将继续执行。
也能够只设置permitRunStartTime,将permitRunEndTime设置为空或者-1。
fetchDataNumber >= threadNumber * 最少循环次数10,不然TBSchedule日志会提示:参数设置不合理,系统性能不佳。
10.理论上单台机器最大线程数为:
线程数threadNumber*单个机器的线程组数量numOfSingleServer,而numOfSingleServer并非上限,仅有1台机器时,该机器的线程组数量能达到assignNum。
11.TBSchedule给各机器以线程组为单位进行分配,全部机器的线程组总数不会超过最大线程组数量assignNum。
12.通常来讲在selectTasks()中获取任务,而后在execute()方法中处理,在SLEEP处理模式下,最后一个活动线程才会去获取任务,所以不会出现重复执行任务的状况。但若是在selectTasks()或execute()中再新建线程或线程池来处理任务,会出现新建线程未处理完成,但TBSchedule框架认为已处理结束从而进行下一次获取任务的操做,可能会重复取出正在处理的任务,所以应尽可能避免新建线程和线程池。
13.在selectTasks()中获取到任务后或者在execute()中处理完任务后应更改状态,防止下次再次取到,形成重复处理。
14.在SLEEP处理模式下,配置的分片数量应合理,分片较多则同一线程组分配过多分片,对不一样分片分别查询获取任务则效率会下降,而分片较少则不利于扩展机器。
15.在SLEEP处理模式下,同一时间只会有一个线程执行selectTasks(),其余线程均处于休眠状态,所以不宜在selectTasks()中进行过多操做而让其余线程等待时间过长,处理工做应尽可能在execute()中进行。或者采用NOTSLEEP模式,让多个线程能够同时运行selectTasks()获取不一样分片的任务。
NOTSLEEP模式须要实现getComparator(),防止从任务池中取出的某项任务正在被本进程中的其余线程处理。原理是在取任务前先取得正在运行的任务放入maybeRepeatTaskList中,取得任务放入任务池后,再与maybeRepeatTaskList中的每项任务对比。同时取任务时加锁保证只有一个线程在取任务。
只有在NotSleep模式下getComparator()才有效,在Sleep模式下无效。
执行getComparator()时会遍历正在处理的任务池。
16.复杂任务能够拆分红多项子任务,并配置不一样的策略,为操做最复杂的子任务分配较多线程,从而提升整体的处理效率。
若是不进行拆分,则会有单个线程处理时间较长,并发的线程数较少,处理时间长短不一, 且任务分配不均匀等问题。例如任务为:从FTP中取得不一样大小的文件进行解析,将每行数据写入分库中。
若是在selectTasks中取得的每一个任务对应一个文件,在execute()中处理任务时(解析文件并入库),效率会很是低。可对解析文件的任务作改造:
改造方案1:在execute()中解析文件后入库时采用线程池处理。但这样仍不能解决任务分配不匀的问题,且引入线程池会增长线程数量。尤为是会形成框架错误判断任务已结束,致使重复处理,所以本方案不合理。
改造方案2:将任务拆分为两个子任务,文件解析和入分库。
子任务1:在原execute()中对文件解析后不直接入分库,而是取1000条合成1条记录存入本地单库的中间表,解析文件耗时较短且记录数较少能够较快完成,且时间不都可以忽略。
子任务2:对中间表记录按照自增主键ID分片,selectTasks()中取得记录,而后拆分红原始单条记录返回,在execute()中对单条记录进行入库处理。
改造方案2的线程数较少,且任务分配会比较均匀,同时避免了单线程处理一个大任务等待时间过长的问题。
17.Zookeeper存储的数据:机器、策略定义、任务定义、任务分片(包含当前属于哪一个机器处理)
18.在zookeeper中每台机器都可保存不一样的线程数等配置,说明不一样机器可使用不一样的线程数等配置,但不建议使用不一样配置。
19.在多任务模式下,executeNumber不要设置的太大,较小一些能够减小等待最后一个活跃线程的时间,而且若是fetchDataNumber<线程数*executeNumber,则会有线程没法分得任务。任务分配在本进程中进行,并不会请求zookeeper,所以设置的较小一些效率更高。
20.当须要重启应用时,要在控制台上先把机器所有中止,等待线程组消失,不然直接重启应用时会出现新的机器实例,旧的机器实例未能释放分片,致使新的机器获取不到任务分片没法执行,控制台上会显示新、旧线程组均为红色。
21.使用同一zookeeper目录的多台机器中,先启动的机器通常为leader,负责分片的分配。
22.控制台显示某线程组红色异常,长时间未取数时,多是取任务的selectTasks()运行异常,或者每次取的任务数量过大,致使长时间未会处理完,能够适当调小eachFetchDataNum。
也有多是由于在SLEEP模式下任务处理时间过长。
23.分片按线程组进行分配,同一机器中有多个线程组时,该机器分得多个分片,也会均匀分配给线程组,每一个线程组各自独立取任务调度,不会同时取任务。
24.当加入新机器时,会请求得到分片。框架10秒扫描一次,若是发现机器数量有变化,且占用分片较多的机器完成任务则会自动从新分配分片。
25.若是每次从数据库里取待处理记录生成任务时,若是总记录数较多,即便取到的有效记录数较少,则扫描整张表花费时间较长,除了创建必要的索引,也应该减小无数据时扫描频次,即下降sleepTimeInterval,也可在selectTasks()中在取到记录后检查数量,若是较少则sleep一段时间再返回任务,也应加大sleepTimeNoData。
26.若是任务处理结束后还要合并结果再进入下一轮处理,则最慢的机器会减慢总体速度,所以要尽可能保证任务分片均匀分给不一样机器,分片数量要能被机器数量整除,也能被最大线程组数量assignNum整除,这样每台机器处理的任务数量大体相同。
27.在Zookeeper链接配置中保存时提示:
错误信息:Zookeeper connecting ......localhost:2181
同时没法进入其余页面,可能因为采用不一样的用户名密码配置过同一目录造zookeeper数据异常,能够在zookeeper中手动删除目录数据,或者更换新目录后重启应用。
在zookeeper中删除目录方法:
[root@192.168.1.5]$ ./zkClient.sh
[zk: localhost:2181(CONNECTED) 0] addauth digest admin:password
[zk: localhost:2181(CONNECTED) 1] rmr /app-schedule/demo
在控制台没法修改目录的帐户密码,可在zookeeper客户端中删除目录后重建目录及帐户密码。
28.每位用户登陆控制台后打开的配置信息均保存在bin目录下的pamirsScheduleConfig.properties,所以在同一Tomcat下操做不一样的TBSchedule目录时会冲突,已修改TBSchedule的代码解决了这一问题。
29.selectTasks()方法从数据库中取得记录时,能够在select语句中对某字段进行mod取余,这样只获取本线程组所分配的分片。通常有多个分库时,同时也会采用mycat,主键ID没法采用自增,经常使用雪花算法来生成不重复的ID,但对这种ID取模通常不容易均匀,所以可增长建立时间戳字段来用于取模,通常各机器取得的任务数较为均匀。
30.若是使用zookeeper集群,则在tbschedule.properties中配置schedule.zookeeper.address时,格式以下:
IP1:Port1,IP2:Port2,IP3:Port3
31.TBSchedule没法实现任务均衡的转移,即当一台机器处理任务较多,其余机器较闲时,不会转到其余机器。
32.若是使用数据库链接池,则单个机器中的线程数量不要比链接池数量大太多,或者不高于,以防出现线程获取不到数据库链接的状况出现。
33.Sleep模式在实现逻辑上相对简单清晰,但存在一个大任务处理时间长,致使其它线程不工做的状况。
在NotSleep模式下,减小了线程休眠的时间,避免大任务阻塞的状况,但为了不数据被重复处理,增长了CPU在数据比较上的开销。
同时要求业务接口实现对象的比较接口。
若是对任务处理不容许停顿的状况下建议用NotSleep模式,其它状况建议用Sleep模式。
34.主机编号最小的为Leader,若是是Leader第一次启动则会清除全部垃圾数据。
35.若是任务是轮询类型,可将permitRunStartTime、permitRunEndTime均设置为空,将一直运行,可设置sleepTimeNoData、sleepTimeInterval来sleep。
若是要设置在必定时间作内轮询,则能够同时设置permitRunStartTime、permitRunEndTime,在这一时间段内会执行selectTasks()及execute()。
在到达结束时间时,会将任务池清空,并设置中止运行标志,此时将没法再启动新的线程运行execute(),所以若是selectTasks()运行时间略长于permitRunEndTime-permitRunStartTime,则execute()可能会永远都没法被执行到。
例如:permitRunStartTime设置为:0/10 * * * * ?
permitRunEndTime设置为:5/10 * * * * ?
而selectTasks()执行时间为6秒,则在第6秒时execute()没有机会被执行。
所以对于轮询任务,最好将permitRunStartTime、permitRunEndTime均设置为空.
将permitRunEndTime设置为-1与为空做用一致。
36.若是任务是定时任务,则能够只设置permitRunStartTime,而将permitRunEndTime设置为空或-1,这样在selectTasks()取得任务为空时会sleep(),直到下一个开始时间时才会执行。
例如:permitRunStartTime设置为:0/10 * * * * ?
permitRunEndTime设置为:-1
则在每10秒的第0秒开始执行selectTasks()取任务,若是取到任务则会交给其余线程执行execute(),若是未取到则会sleep(),直到下一个开始时间时才会执行。
若是只但愿同一时间仅有一个线程处理任务,则能够只设置一个分片,并采用SLEEP模式,numOfSingleServer、assignNum均设置为1。
37.每一个心跳周期都会向zookeeper更新心跳信息,若是超过judgeDeadInterval(假定服务死亡间隔)未更新过,则清除zookeeper上的任务信息及Server信息。每一个心跳周期也会从新分配分片。
也会清除zookeeper中分配给已消失机器上的任务信息。
38.若是有定时任务执行出现故障,或者因重启错过了执行时间,若是要在下一次时间点前再次执行,则能够在控制台上临时增长任务类型、策略,来临时定时执行一次,月日也加上防止忘记删除任务致使屡次重复执行。执行完成后再删除该任务类型、策略。
39.有时应用启动后日志显示正常,但不执行任务,有多是zookeeper中数据出现错误,可删除该目录,重启应用便可。
40.在控制台上点击机器的中止按钮时,会将zookeeper中该机器的运行状态设置为false,并清除本机器的任务池中未被处理的任务。在每台机器进程中每2秒刷新一次运行状态,当检测到false,则在任务执行完毕后再也不取任务处理。
41.SystemTBScheduleManagerFactory也可取消,改用@Bean注解,例如:
ScheduleJobConfiguration.java:
package
com.jfbank.schedule.monitor.alarm.tbs;
import
java.util.HashMap;
import
java.util.Map;
import
org.springframework.beans.factory.annotation.Value;
import
org.springframework.context.annotation.Bean;
import
org.springframework.context.annotation.Configuration;
import
com.taobao.pamirs.schedule.strategy.TBScheduleManagerFactory;
@Configuration
public
class
ScheduleJobConfiguration{
@Bean
(initMethod =
"init"
)
public
TBScheduleManagerFactory tbScheduleManagerFactory(
@Value
(
"${schedule.zookeeper.address}"
)String zkConnectString,
@Value
(
"${schedule.root.catalog}"
)String rootPath,
@Value
(
"${schedule.timeout}"
)String zkSessionTimeout,
@Value
(
"${schedule.username}"
)String userName,
@Value
(
"${schedule.password}"
)String password,
@Value
(
"${schedule.isCheckParentPath}"
)String isCheckParentPath) {
TBScheduleManagerFactory tbScheduleManagerFactory =
new
TBScheduleManagerFactory();
Map<String, String> zkConfig =
new
HashMap<String, String>();
zkConfig.put(
"zkConnectString"
, zkConnectString);
zkConfig.put(
"rootPath"
, rootPath);
zkConfig.put(
"zkSessionTimeout"
, zkSessionTimeout);
zkConfig.put(
"userName"
, userName);
zkConfig.put(
"password"
, password);
zkConfig.put(
"isCheckParentPath"
, isCheckParentPath);
tbScheduleManagerFactory.setZkConfig(zkConfig);
return
tbScheduleManagerFactory;
}
}
|