qq交流群:812321371
java
Elastic-Job
是一个分布式调度解决方案,由两个相互独立的子项目Elastic-Job-Lite
和Elastic-Job-Cloud
组成。Elastic-Job-Lite
定位为轻量级无中心化解决方案,使用jar包的形式提供分布式任务的协调服务。
基于quartz
定时任务框架为基础的,所以具有quartz
的大部分功能
使用zookeeper
作协调,调度中心,更加轻量级
支持任务的分片
支持弹性扩容,能够水平扩展, 当任务再次运行时,会检查当前的服务器数量,从新分片,分片结束以后才会继续执行任务
失效转移,容错处理,当一台调度服务器宕机或者跟zookeeper
断开链接以后,会当即中止做业,而后再去寻找其余空闲的调度服务器,来运行剩余的任务
提供运维界面,能够管理做业和注册中心。mysql
因为项目为微服务,单模块可能在两个实例以上的数量,定时器就会出现多实例同时执行的状况。
通常定时器缺乏管理界面,没法监控定时器是否执行成功。
市面上常见的解决方案为定时器加锁的操做,或者采用第3方分布式定时器。
分布式定时器有多种方案,好比阿里内部的ScheduledX
,当当网的Elastic job
,我的开源的xxl-job
等。git
分片:任务的分布式执行,须要将一个任务拆分为多个独立的任务项,而后由分布式的服务器分别执行某一个或几个分片项。
例如:有一个遍历数据库某张表的做业,现有2台服务器。为了快速的执行做业,那么每台服务器应执行做业的50%
。 为知足此需求,可将做业分红2片,每台服务器执行1片。做业遍历数据的逻辑应为:服务器A遍历ID以奇数结尾的数据;服务器B遍历ID以偶数结尾的数据。 若是分红10片,则做业遍历数据的逻辑应为:每片分到的分片项应为ID%10,而服务器A被分配到分片项0,1,2,3,4
;服务器B被分配到分片项5,6,7,8,9
,直接的结果就是服务器A遍历ID
以0-4
结尾的数据;服务器B遍历ID
以5-9
结尾的数据。github
历史轨迹:Elastic-Job
提供了事件追踪功能,可经过事件订阅的方式处理调度过程的重要事件,用于查询、统计和监控。spring
elasticjob
因为当当网Elastic job
处于1年间未更新阶段,相关jar处于可使用阶段功能不全。考虑到使用场景为多项目使用,将elastic-job-lite-spring
简单封装便于使用。sql
ps:实际version版本请使用最新版
数据库
<dependency> <groupId>com.purgeteam</groupId> <artifactId>elasticjob-spring-boot-starter</artifactId> <version>0.1.1.RELEASE</version> </dependency>
ps: 须要mysql
,zookeeper
支持,请提早搭建好。bootstrap
配置bootstrap.yml
或者application.yml
。服务器
加入如下配置:app
spring: elasticjob: datasource: # job须要的记录数据源 url: jdbc:mysql://127.0.0.1:3306/batch_log?useUnicode=true&characterEncoding=utf-8&verifyServerCertificate=false&useSSL=false&requireSSL=false driver-class-name: com.mysql.cj.jdbc.Driver username: root password: Rtqw123OpnmER regCenter: # 注册中心 serverList: 127.0.0.1:2181 namespace: elasticJobDemo
建立定时器类(惟一不一样的地方在于将@Scheduled
改成实现SimpleJob
接口便可)
定时器实现方法编写在execute
方法里。
@Slf4j @Component public class MySimpleJob implements SimpleJob { // @Scheduled(cron = "0 0/1 * * * ?") @Override public void execute(ShardingContext shardingContext) { log.info(String.format("Thread ID: %s, 做业分片总数: %s, " + "当前分片项: %s.当前参数: %s," + "做业名称: %s.做业自定义参数: %s", Thread.currentThread().getId(), shardingContext.getShardingTotalCount(), shardingContext.getShardingItem(), shardingContext.getShardingParameter(), shardingContext.getJobName(), shardingContext.getJobParameter() )); // 分片大体以下:根据配置的分片参数执行相应的逻辑 switch (context.getShardingItem()) { case 0: // do something by sharding item 0 break; case 1: // do something by sharding item 1 break; case 2: // do something by sharding item 2 break; // case n: ... } } }
log:Thread ID: 66, 做业分片总数: 1, 当前分片项: 0.当前参数: Beijing,做业名称: PropertiesSimpleJob.做业自定义参数: test
将ZookeeperRegistryCenter
和JobEventConfiguration
注入。
建立JobScheduler
@Bean(initMethod = "init")
。
在mySimpleJobScheduler
方法里先经过ElasticJobUtils#getLiteJobConfiguration
获取LiteJobConfiguration
对象。
建立SpringJobScheduler
对象返回便可。
@Configuration public class MyJobConfig { // job 名称 private static final String JOB_NAME = "MySimpleJob"; // 定时器cron参数 private static final String CRON = "0 0/1 * * * ?"; // 定时器分片 private static final int SHARDING_TOTAL_COUNT = 1; // 分片参数 private static final String SHARDING_ITEM_PARAMETERS = "0=Beijing,1=Shanghai,2=Guangzhou"; // 自定义参数 private static final String JOB_PARAMETERS = "parameter"; @Resource private ZookeeperRegistryCenter regCenter; @Resource private JobEventConfiguration jobEventConfiguration; @Bean(initMethod = "init") public JobScheduler mySimpleJobScheduler(final MySimpleJob mySimpleJob) { LiteJobConfiguration liteJobConfiguration = ElasticJobUtils .getLiteJobConfiguration(mySimpleJob.getClass(), JOB_NAME, CRON, SHARDING_TOTAL_COUNT, SHARDING_ITEM_PARAMETERS, JOB_PARAMETERS); // 参数:1.定时器实例,2.注册中心类,3.LiteJobConfiguration, // 3.历史轨迹(不须要能够省略) return new SpringJobScheduler(mySimpleJob, regCenter, liteJobConfiguration, jobEventConfiguration); } }
ElasticJobUtils#getLiteJobConfiguration
参数简介:
/** * 获取 {@link LiteJobConfiguration} 对象 * * @param jobClass 定时器实现类 * @param jobName 定时器名称 * @param cron 定时参数 * @param shardingTotalCount 做业分片总数 * @param shardingItemParameters 当前参数 能够为null * @param jobParameters 做业自定义参数 能够为null * @return {@link LiteJobConfiguration} */ public static LiteJobConfiguration getLiteJobConfiguration( final Class<? extends SimpleJob> jobClass, final String jobName, final String cron, final int shardingTotalCount, final String shardingItemParameters, final String jobParameters) { ... return ...; }
固然也能够用下面的@Configuration
实现简化,配置bootstrap.yml
或者application.yml
。
spring: elasticjob: scheduled: jobConfigMap: // 为map集合 PropertiesSimpleJob: // 定时器key名称 jobName: PropertiesSimpleJob // job名称 cron: 0 0/1 * * * ? // cron表达式 shardingTotalCount: 2 // 分片数量 shardingItemParameters: 0=123,1=332 // 分片参数 jobParameters: test // 自定义参数
注入SpringJobSchedulerFactory
,在propertiesSimpleJobScheduler
方法里调用gerSpringJobScheduler
方法便可。
@Configuration public class PropertiesSimpleJobConfig { @Resource private SpringJobSchedulerFactory springJobSchedulerFactory; @Bean(initMethod = "init") public JobScheduler propertiesSimpleJobScheduler(final PropertiesSimpleJob job) { // 参数:1.定时器实例,2.配置名称,3.是否开启历史轨迹 return springJobSchedulerFactory.getSpringJobScheduler(job,"PropertiesSimpleJob", true); } }
ps:这个注解包含了上述方式,简化定时器注入。
继承SimpleJob
实现方法execute
。
在AnnotationSimpleJob
类上加入注解@ElasticJobScheduler
便可。
下面为完整注解。
@Slf4j @ElasticJobScheduler( name = "AnnotationSimpleJob", // 定时器名称 cron = "0/8 * * * * ?", // 定时器表达式 shardingTotalCount = 1, // 做业分片总数 默认为1 shardingItemParameters = "0=Beijing,1=Shanghai,2=Guangzhou", // 分片序列号和参数用等号分隔 不须要参数能够不加 jobParameters = "123", // 做业自定义参数 不须要参数能够不加 isEvent = true // 是否开启数据记录 默认为true ) public class AnnotationSimpleJob implements SimpleJob { @Override public void execute(ShardingContext shardingContext) { log.info(String.format("Thread ID: %s, 做业分片总数: %s, " + "当前分片项: %s.当前参数: %s," + "做业名称: %s.做业自定义参数: %s", Thread.currentThread().getId(), shardingContext.getShardingTotalCount(), shardingContext.getShardingItem(), shardingContext.getShardingParameter(), shardingContext.getJobName(), shardingContext.getJobParameter() )); } }
分布式job能够解决多个项目同一个定时器都执行的问题,配合elastic-job控制台能够直观监控定时器执行状况等。
示例代码地址: elastic-job-spring-boot