LTS(light-task-scheduler)主要用于解决分布式任务调度问题,支持实时任务,定时任务和Cron任务。有较好的伸缩性,扩展性,健壮稳定性而被多家公司使用,同时也但愿开源爱好者一块儿贡献。java
github地址: https://github.com/qq254963746/light-task-schedulernode
oschina地址: http://git.oschina.net/hugui/light-task-schedulermysql
这两个地址都会同步更新。感兴趣,请加QQ群:109500214 一块儿探讨、完善。越多人支持,就越有动力去更新,喜欢记得右上角star哈。linux
LTS 有主要有如下四种节点:git
其中JobClinet,JobTracker,TaskTracker节点都是无状态
的。 能够部署多个并动态的进行删减,来实现负载均衡,实现更大的负载量, 而且框架采用FailStore策略使LTS具备很好的容错能力。github
LTS注册中心提供多种实现(Zookeeper,redis等),注册中心进行节点信息暴露,master选举。(Mongo or Mysql)存储任务队列和任务执行日志, netty or mina作底层通讯, 并提供多种序列化方式fastjson, hessian2, java等。redis
LTS支持任务类型:spring
###节点组sql
###FailStoreshell
下图是一个标准的实时任务执行流程。
目先后台带有由ztajy提供的一个简易的认证功能. 用户名密码在auth.cfg中,用户自行修改.
##特性 ###一、Spring支持 LTS能够彻底不用Spring框架,可是考虑到很用用户项目中都是用了Spring框架,因此LTS也提供了对Spring的支持,包括Xml和注解,引入lts-spring.jar
便可。 ###二、业务日志记录器 在TaskTracker端提供了业务日志记录器,供应用程序使用,经过这个业务日志器,能够将业务日志提交到JobTracker,这些业务日志能够经过任务ID串联起来,能够在LTS-Admin中实时查看任务的执行进度。 ###三、SPI扩展支持 SPI扩展能够达到零侵入,只须要实现相应的接口,并实现便可被LTS使用,目前开放出来的扩展接口有
###四、故障转移 当正在执行任务的TaskTracker宕机以后,JobTracker会立马将分配在宕机的TaskTracker的全部任务再分配给其余正常的TaskTracker节点执行。 ###五、节点监控 能够对JobTracker,TaskTracker节点进行资源监控,任务监控等,能够实时的在LTS-Admin管理后台查看,进而进行合理的资源调配。 ###六、多样化任务执行结果支持 LTS框架提供四种执行结果支持,EXECUTE_SUCCESS
,EXECUTE_FAILED
,EXECUTE_LATER
,EXECUTE_EXCEPTION
,并对每种结果采起相应的处理机制,譬如重试。
###七、FailStore容错 采用FailStore机制来进行节点容错,Fail And Store,不会由于远程通讯的不稳定性而影响当前应用的运行。具体FailStore说明,请参考概念说明中的FailStore说明。
##项目编译打包 项目主要采用maven进行构建,目前提供shell脚本的打包。 环境依赖:Java(jdk1.7)
Maven
用户使用通常分为两种: ###一、Maven构建 能够经过maven命令将lts的jar包上传到本地仓库中。在父pom.xml中添加相应的repository,并用deploy命令上传便可。具体引用方式能够参考lts中的例子便可。 ###二、直接Jar引用 须要将lts的各个模块打包成单独的jar包,而且将全部lts依赖包引入。具体引用哪些jar包能够参考lts中的例子便可。
##JobTracker和LTS-Admin部署 提供(cmd)windows
和(shell)linux
两种版本脚原本进行编译和部署:
运行根目录下的sh build.sh
或build.cmd
脚本,会在dist
目录下生成lts-{version}-bin
文件夹
下面是其目录结构,其中bin目录主要是JobTracker和LTS-Admin的启动脚本。jobtracker
中是 JobTracker的配置文件和须要使用到的jar包,lts-admin
是LTS-Admin相关的war包和配置文件。 lts-{version}-bin的文件结构
├── bin │ ├── jobtracker.cmd │ ├── jobtracker.sh │ ├── lts-admin.cmd │ └── lts-admin.sh ├── jobtracker │ ├── conf │ │ └── zoo │ │ ├── jobtracker.cfg │ │ └── log4j.properties │ └── lib │ └── *.jar ├── lts-admin │ ├── conf │ │ ├── log4j.properties │ │ └── lts-admin.cfg │ ├── lib │ │ └── *.jar │ └── lts-admin.war └── tasktracker ├── bin │ └── tasktracker.sh ├── conf │ ├── log4j.properties │ └── tasktracker.cfg └── lib └── *.jar
conf/zoo
下的配置文件,而后运行 sh jobtracker.sh zoo start
便可,若是你想启动两个JobTracker节点,那么你须要拷贝一份zoo,譬如命名为zoo2
,修改下zoo2
下的配置文件,而后运行sh jobtracker.sh zoo2 start
便可。logs文件夹下生成jobtracker-zoo.out
日志。lts-admin/conf
下的配置,而后运行bin
下的sh lts-admin.sh
或lts-admin.cmd
脚本便可。logs文件夹下会生成lts-admin.out
日志,启动成功在日志中会打印出访问地址,用户能够经过这个访问地址访问了。##JobClient(部署)使用 须要引入lts的jar包有lts-jobclient-{version}.jar
,lts-core-{version}.jar
及其它第三方依赖jar。 ###API方式启动
JobClient jobClient = new RetryJobClient(); jobClient.setNodeGroup("test_jobClient"); jobClient.setRegistryAddress("zookeeper://127.0.0.1:2181"); jobClient.start(); // 提交任务 Job job = new Job(); job.setTaskId("3213213123"); job.setParam("shopId", "11111"); job.setTaskTrackerNodeGroup("test_trade_TaskTracker"); // job.setCronExpression("0 0/1 * * * ?"); // 支持 cronExpression表达式 // job.setTriggerTime(new Date()); // 支持指定时间执行 Response response = jobClient.submitJob(job);
###Spring XML方式启动
<bean id="jobClient" class="com.lts.spring.JobClientFactoryBean"> <property name="clusterName" value="test_cluster"/> <property name="registryAddress" value="zookeeper://127.0.0.1:2181"/> <property name="nodeGroup" value="test_jobClient"/> <property name="masterChangeListeners"> <list> <bean class="com.lts.example.support.MasterChangeListenerImpl"/> </list> </property> <property name="jobFinishedHandler"> <bean class="com.lts.example.support.JobFinishedHandlerImpl"/> </property> <property name="configs"> <props> <!-- 参数 --> <prop key="job.fail.store">leveldb</prop> </props> </property> </bean>
###Spring 全注解方式
@Configuration public class LTSSpringConfig { @Bean(name = "jobClient") public JobClient getJobClient() throws Exception { JobClientFactoryBean factoryBean = new JobClientFactoryBean(); factoryBean.setClusterName("test_cluster"); factoryBean.setRegistryAddress("zookeeper://127.0.0.1:2181"); factoryBean.setNodeGroup("test_jobClient"); factoryBean.setMasterChangeListeners(new MasterChangeListener[]{ new MasterChangeListenerImpl() }); Properties configs = new Properties(); configs.setProperty("job.fail.store", "leveldb"); factoryBean.setConfigs(configs); factoryBean.afterPropertiesSet(); return factoryBean.getObject(); } }
##TaskTracker(部署使用) 须要引入lts的jar包有lts-tasktracker-{version}.jar
,lts-core-{version}.jar
及其它第三方依赖jar。 ###定义本身的任务执行类
public class MyJobRunner implements JobRunner { private final static BizLogger bizLogger = LtsLoggerFactory.getBizLogger(); @Override public Result run(Job job) throws Throwable { try { // TODO 业务逻辑 // 会发送到 LTS (JobTracker上) bizLogger.info("测试,业务日志啊啊啊啊啊"); } catch (Exception e) { return new Result(Action.EXECUTE_FAILED, e.getMessage()); } return new Result(Action.EXECUTE_SUCCESS, "执行成功了,哈哈"); } }
###API方式启动
TaskTracker taskTracker = new TaskTracker(); taskTracker.setJobRunnerClass(MyJobRunner.class); taskTracker.setRegistryAddress("zookeeper://127.0.0.1:2181"); taskTracker.setNodeGroup("test_trade_TaskTracker"); taskTracker.setWorkThreads(20); taskTracker.start();
###Spring XML方式启动
<bean id="taskTracker" class="com.lts.spring.TaskTrackerAnnotationFactoryBean" init-method="start"> <property name="jobRunnerClass" value="com.lts.example.support.MyJobRunner"/> <property name="bizLoggerLevel" value="INFO"/> <property name="clusterName" value="test_cluster"/> <property name="registryAddress" value="zookeeper://127.0.0.1:2181"/> <property name="nodeGroup" value="test_trade_TaskTracker"/> <property name="workThreads" value="20"/> <property name="masterChangeListeners"> <list> <bean class="com.lts.example.support.MasterChangeListenerImpl"/> </list> </property> <property name="configs"> <props> <prop key="job.fail.store">leveldb</prop> </props> </property> </bean>
###Spring注解方式启动
@Configuration public class LTSSpringConfig implements ApplicationContextAware { private ApplicationContext applicationContext; @Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { this.applicationContext = applicationContext; } @Bean(name = "taskTracker") public TaskTracker getTaskTracker() throws Exception { TaskTrackerAnnotationFactoryBean factoryBean = new TaskTrackerAnnotationFactoryBean(); factoryBean.setApplicationContext(applicationContext); factoryBean.setClusterName("test_cluster"); factoryBean.setJobRunnerClass(MyJobRunner.class); factoryBean.setNodeGroup("test_trade_TaskTracker"); factoryBean.setBizLoggerLevel("INFO"); factoryBean.setRegistryAddress("zookeeper://127.0.0.1:2181"); factoryBean.setMasterChangeListeners(new MasterChangeListener[]{ new MasterChangeListenerImpl() }); factoryBean.setWorkThreads(20); Properties configs = new Properties(); configs.setProperty("job.fail.store", "leveldb"); factoryBean.setConfigs(configs); factoryBean.afterPropertiesSet(); // factoryBean.start(); return factoryBean.getObject(); } }
##参数说明
参数 | 是否必须 | 默认值 | 使用范围 | 设置方式 | 参数说明 |
---|---|---|---|---|---|
registryAddress | 必须 | 无 | JobClient,JobTracker,TaskTracker | setRegistryAddress("xxxx") | 注册中心,能够选用zk或者redis,参考值: zookeeper://127.0.0.1:2181 |
clusterName | 必须 | 无 | JobClient,JobTracker,TaskTracker | setClusterName("xxxx") | 集群名称,clusterName相同的全部节点才会组成整个LTS架构 |
listenPort | 必须 | 35001 | JobTracker | setListenPort(xxx) | JobTracker的远程监听端口 |
job.logger | 必须 | console | JobTracker | addConfig("job.logger","xxx") | LTS业务日志记录器,可选值console,mysql,mongo,或者本身实现SPI扩展 |
job.queue | 必须 | mongo | JobTracker | addConfig("job.queue", "xx") | LTS任务队列,可选值mongo,mysql,或者本身实现SPI扩展 |
jdbc.url | 可选 | 无 | JobTracker | addConfig("jdbc.url", "xxx") | mysql链接URL,当job.queue为mysql的时候起做用 |
jdbc.username | 可选 | 无 | JobTracker | addConfig("jdbc.username", "xxx") | mysql链接密码,当job.queue为mysql的时候起做用 |
jdbc.password | 可选 | 无 | JobTracker | addConfig("jdbc.password", "xxx") | mysql链接密码,当job.queue为mysql的时候起做用 |
mongo.addresses | 可选 | 无 | JobTracker | addConfig("mongo.addresses", "xxx") | mongo链接URL,当job.queue为mongo的时候起做用 |
mongo.database | 可选 | 无 | JobTracker | addConfig("mongo.database", "xxx") | mongo数据库名,当job.queue为mongo的时候起做用 |
zk.client | 可选 | zkclient | JobClient,JobTracker,TaskTracker | addConfig("zk.client", "xxx") | zookeeper客户端,可选值zkclient, curator |
job.pull.frequency | 可选 | 3 | TaskTracker | addConfig("job.pull.frequency", "xx") | TaskTracker去向JobTracker Pull任务的频率,针对不一样的场景能够作相应的调整,单位秒 |
job.max.retry.times | 可选 | 10 | JobTracker | addConfig("job.max.retry.times", "xx") | 任务的最大重试次数 |
lts.monitor.url | 可选 | 无 | JobTracker,TaskTracker | addConfig("lts.monitor.url", "xx") | 监控中心地址,也就是LTS-Admin地址,如 http://localhost:8081 |
stop.working | 可选 | false | TaskTracker | addConfig("stop.working", "true") | 主要用于当TaskTracker与JobTracker出现网络隔离的时候,超过必定时间隔离以后,TaskTracker自动中止当前正在运行的任务 |
job.fail.store | 可选 | leveldb | JobClient,TaskTracker | addConfig("job.fail.store", "leveldb") | 可选值:leveldb(默认), rocksdb, berkeleydb, mapdb FailStore实现, leveldb有问题的同窗,能够试试mapdb |
lazy.job.logger | 可选 | false | JobTracker | addConfig("lazy.job.logger", "true") | 可选值:ture,false, 是否延迟批量刷盘日志, 若是启用,采用队列的方式批量将日志刷盘(在应用关闭的时候,可能会形成日志丢失) |
dataPath | 可选 | user.home | JobClient,TaskTracker,JobTracker | setDataPath("xxxx") | FailStore文件存储路径及其它数据存储路径 |
lts.monitor.interval | 可选 | 1 | JobClient,TaskTracker,JobTracker | addConfig("lts.monitor.interval", "2") | 分钟,整数,建议1-5分钟 |
lts.remoting | 可选 | netty | JobClient,TaskTracker,JobTracker | addConfig("lts.remoting", "netty") | 底层通信框架,可选值netty和mina,能够混用,譬如JobTracker是netty, JobClient采用mina |
lts.remoting.serializable.default | 可选 | fastjson | JobClient,TaskTracker,JobTracker | addConfig("lts.remoting.serializable.default", "fastjson") | 底层通信默认序列化方式,可选值 fastjson, hessian2 ,java,底层会自动识别你请求的序列化方式,而后返回数据也是采用与请求的序列化方式返回,假设JobTracker设置的是fastjson,而JobClient是hessian2,那么JobClient提交任务的时候,序列化方式是hessian2,当JobTracker收到请求的时候采用hessian2解码,而后也会将响应数据采用hessian2编码返回给JobClient |
lts.compiler | 可选 | javassist | JobClient,TaskTracker,JobTracker | addConfig("lts.compiler", "javassist") | java编译器,可选值 jdk, javassist(须要引入javassist包) |
##使用建议 通常在一个JVM中只须要一个JobClient实例便可,不要为每种任务都新建一个JobClient实例,这样会大大的浪费资源,由于一个JobClient能够提交多种任务。相同的一个JVM通常也尽可能保持只有一个TaskTracker实例便可,多了就可能形成资源浪费。当遇到一个TaskTracker要运行多种任务的时候,请参考下面的 "一个TaskTracker执行多种任务"。 ##一个TaskTracker执行多种任务 有的时候,业务场景须要执行多种任务,有些人会问,是否是要每种任务类型都要一个TaskTracker去执行。个人答案是否认的,若是在一个JVM中,最好使用一个TaskTracker去运行多种任务,由于一个JVM中使用多个TaskTracker实例比较浪费资源(固然当你某种任务量比较多的时候,能够将这个任务单独使用一个TaskTracker节点来执行)。那么怎么才能实现一个TaskTracker执行多种任务呢。下面是我给出来的参考例子。
/** * 总入口,在 taskTracker.setJobRunnerClass(JobRunnerDispatcher.class) * JobClient 提交 任务时指定 Job 类型 job.setParam("type", "aType") */ public class JobRunnerDispatcher implements JobRunner { private static final ConcurrentHashMap<String/*type*/, JobRunner> JOB_RUNNER_MAP = new ConcurrentHashMap<String, JobRunner>(); static { JOB_RUNNER_MAP.put("aType", new JobRunnerA()); // 也能够从Spring中拿 JOB_RUNNER_MAP.put("bType", new JobRunnerB()); } @Override public Result run(Job job) throws Throwable { String type = job.getParam("type"); return JOB_RUNNER_MAP.get(type).run(job); } } class JobRunnerA implements JobRunner { @Override public Result run(Job job) throws Throwable { // TODO A类型Job的逻辑 return null; } } class JobRunnerB implements JobRunner { @Override public Result run(Job job) throws Throwable { // TODO B类型Job的逻辑 return null; } }
##TaskTracker的JobRunner测试 通常在编写TaskTracker的时候,只须要测试JobRunner的实现逻辑是否正确,又不想启动LTS进行远程测试。为了方便测试,LTS提供了JobRunner的快捷测试方法。本身的测试类集成com.lts.tasktracker.runner.JobRunnerTester
便可,并实现initContext
和newJobRunner
方法便可。如lts-example
中的例子:
public class TestJobRunnerTester extends JobRunnerTester { public static void main(String[] args) throws Throwable { // 1. Mock Job 数据 Job job = new Job(); job.setTaskId("2313213"); // 2. 运行测试 TestJobRunnerTester tester = new TestJobRunnerTester(); Result result = tester.run(job); System.out.println(JSONUtils.toJSONString(result)); } @Override protected void initContext() { // TODO 初始化Spring容器等 } @Override protected JobRunner newJobRunner() { return new TestJobRunner(); } }
##多网卡选择问题 当机器有内网两个网卡的时候,有时候,用户想让LTS的流量走外网网卡,那么须要在host中,把主机名称的映射地址改成外网网卡地址便可,内网同理。
##打包成独立jar 请在lts-parent/lts
下install便可,会在 lts-parent/lts/target
下生成lts-{version}.jar
##关于节点标识问题 若是在节点启动的时候设置节点标识,LTS会默认设置一个UUID为节点标识,可读性会比较差,可是能保证每一个节点的惟一性,若是用户能本身保证节点标识的惟一性,能够经过 setIdentity
来设置,譬如若是每一个节点都是部署在一台机器(一个虚拟机)上,那么能够将identity设置为主机名称
##SPI扩展说明 ###LTS-Logger扩展
lts-logger-api-{version}.jar
JobLogger
和JobLoggerFactory
接口META-INF/lts/com.lts.biz.logger.JobLoggerFactory
文件,文件内容为xxx=com.lts.biz.logger.xxx.XxxJobLoggerFactory
jobtracker.addConfig("job.logger", "xxx"))
###LTS-Queue扩展 实现方式和LTS-Logger扩展相似,具体参考lts-queue-mysql
或lts-queue-mongo
模块的实现 ##和其它解决方案比较 ###和MQ比较 见docs/LTS业务场景说明.pdf ###和Quartz比较 见docs/LTS业务场景说明.pdf