在实际生产中利用Elastic-job+spring-boot用注解实现多任务的配置spring
<dependency>
<groupId>com.dangdang</groupId>
<artifactId>elastic-job-lite-core</artifactId>
<version>${elastic-job.version}</version>
<exclusions>
<exclusion>
<artifactId>curator-client</artifactId>
<groupId>org.apache.curator</groupId>
</exclusion>
<exclusion>
<artifactId>curator-framework</artifactId>
<groupId>org.apache.curator</groupId>
</exclusion>
<exclusion>
<artifactId>curator-recipes</artifactId>
<groupId>org.apache.curator</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.dangdang</groupId>
<artifactId>elastic-job-lite-spring</artifactId>
<version>${elastic-job.version}</version>
<exclusions>
<exclusion>
<artifactId>curator-client</artifactId>
<groupId>org.apache.curator</groupId>
</exclusion>
<exclusion>
<artifactId>curator-framework</artifactId>
<groupId>org.apache.curator</groupId>
</exclusion>
<exclusion>
<artifactId>curator-recipes</artifactId>
<groupId>org.apache.curator</groupId>
</exclusion>
</exclusions>
</dependency>
复制代码
注意:若是项目中单独导入了高版本的zookeeper的客户端curator包则须要在Elastic-job包中剔除curator,不然会发生curator的版本冲突数据库
即在yaml配置文件中配置Elastic-job-lite 依赖的zookeeper 分布式调度服务器参数apache
@Configuration
public class JobParserAutoConfiguration {
/**
* 服务地址列表 包括IP地址和端口号
*/
@Value("${elasticjob.reg-center.server-lists}")
private String serverList;
/**
* 注册中心的命名空间
*/
@Value("${elasticjob.reg-center.namespace}")
private String namespace;
/**
* 等待重试的间隔时间的初始值.
* 单位毫秒.
*/
@Value("${elasticjob.reg-center.baseSleepTimeMilliseconds}")
private Integer baseSleepTimeMilliseconds;
/**
* 等待重试的间隔时间的最大值.
* 单位毫秒.
*/
@Value("${elasticjob.reg-center.maxSleepTimeMilliseconds}")
private Integer maxSleepTimeMilliseconds;
/**
* 会话超时时间.
* 单位毫秒.
*/
@Value("${elasticjob.reg-center.sessionTimeoutMilliseconds}")
private Integer sessionTimeoutMilliseconds;
/**
* 链接超时时间.
* 单位毫秒.
*/
@Value("${elasticjob.reg-center.connectionTimeoutMilliseconds}")
private Integer connectionTimeoutMilliseconds;
/**
* 最大重试次数.
*/
@Value("${elasticjob.reg-center.maxRetries}")
private Integer maxRetries;
@Bean
public JobConfParser jobConfParser() {
return new JobConfParser();
}
/**
* 初始化zookeeper注册中心
* @return ZookeeperRegistryCenter
*/
@Bean(initMethod = "init")
public ZookeeperRegistryCenter regCenter() {
ZookeeperConfiguration zookeeperConfiguration = new ZookeeperConfiguration(serverList, namespace);
zookeeperConfiguration.setBaseSleepTimeMilliseconds(baseSleepTimeMilliseconds);
zookeeperConfiguration.setConnectionTimeoutMilliseconds(connectionTimeoutMilliseconds);
zookeeperConfiguration.setMaxSleepTimeMilliseconds(maxSleepTimeMilliseconds);
zookeeperConfiguration.setSessionTimeoutMilliseconds(sessionTimeoutMilliseconds);
zookeeperConfiguration.setMaxRetries(maxRetries);
return new ZookeeperRegistryCenter(zookeeperConfiguration);
}
}
复制代码
@Component
@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
public @interface ElasticJobConf {
/*********************DataflowJobConfiguration START********************/
/**
* 做业名称
*
* @return
*/
String name();
/**
* 类型
*/
JobType jobType();
/**
* cron表达式,用于控制做业触发时间
*
* @return
*/
String cron() default "";
/**
* 做业分片总数
*
* @return
*/
String shardingTotalCount() default "1";
/**
* 分片序列号和参数用等号分隔,多个键值对用逗号分隔
* <p>分片序列号从0开始,不可大于或等于做业分片总数<p>
* <p>如:<p>
* <p>0=a,1=b,2=c<p>
*
* @return
*/
String shardingItemParameters() default "";
/**
* 做业自定义参数
* <p>做业自定义参数,可经过传递该参数为做业调度的业务方法传参,用于实现带参数的做业<p>
* <p>例:每次获取的数据量、做业实例从数据库读取的主键等<p>
*
* @return
*/
String jobParameter() default "";
/**
* 是否开启任务执行失效转移,开启表示若是做业在一次任务执行中途宕机,容许将该次未完成的任务在另外一做业节点上补偿执行
*
* @return
*/
boolean failover() default false;
/**
* 是否开启错过任务从新执行
*
* @return
*/
boolean misfire() default false;
/**
* 做业描述信息
*
* @return
*/
String description() default "";
boolean overwrite() default false;
/*********************DataflowJobConfiguration END********************/
/*********************DataflowJobConfiguration START********************/
/**
* 是否流式处理数据
* <p>若是流式处理数据, 则fetchData不返回空结果将持续执行做业<p>
* <p>若是非流式处理数据, 则处理数据完成后做业结束<p>
*
* @return
*/
boolean streamingProcess() default false;
/*********************DataflowJobConfiguration END********************/
/*********************ScriptJobConfiguration START********************/
/**
* 脚本型做业执行命令行
*
* @return
*/
String scriptCommandLine() default "";
/*********************ScriptJobConfiguration END********************/
/*********************LiteJobConfiguration START********************/
/**
* 监控做业运行时状态
* <p>每次做业执行时间和间隔时间均很是短的状况,建议不监控做业运行时状态以提高效率。<p>
* <p>由于是瞬时状态,因此无必要监控。请用户自行增长数据堆积监控。而且不能保证数据重复选取,应在做业中实现幂等性。<p>
* <p>每次做业执行时间和间隔时间均较长的状况,建议监控做业运行时状态,可保证数据不会重复选取。<p>
*
* @return
*/
boolean monitorExecution() default true;
/**
* 做业监控端口
* <p>建议配置做业监控端口, 方便开发者dump做业信息。<p>
* <p>使用方法: echo “dump” | nc 127.0.0.1 9888<p>
*
* @return
*/
int monitorPort() default -1;
/**
* 大容许的本机与注册中心的时间偏差秒数
* <p>若是时间偏差超过配置秒数则做业启动时将抛异常<p>
* <p>配置为-1表示不校验时间偏差<p>
*
* @return
*/
int maxTimeDiffSeconds() default -1;
/**
* 做业分片策略实现类全路径,默认使用平均分配策略
*
* @return
*/
String jobShardingStrategyClass() default "";
/**
* 修复做业服务器不一致状态服务调度间隔时间,配置为小于1的任意值表示不执行修复,单位:分钟
*
* @return
*/
int reconcileIntervalMinutes() default 10;
/**
* 做业事件追踪的数据源Bean引用
*
* @return
*/
String eventTraceRdbDataSource() default "";
/*********************LiteJobConfiguration END********************/
/**
* 前置后置任务监听实现类,需实现ElasticJobListener接口
*
* @return
*/
String listener() default "";
/**
* 做业是否禁止启动,可用于部署做业时,先禁止启动,部署结束后统一启动
*
* @return
*/
String disabled() default "false";
/**
* 前置后置任务分布式监听实现类,需继承AbstractDistributeOnceElasticJobListener类
*
* @return
*/
String distributedListener() default "";
/**
* 最后一个做业执行前的执行方法的超时时间,单位:毫秒
*
* @return
*/
long startedTimeoutMilliseconds() default Long.MAX_VALUE;
/**
* 最后一个做业执行后的执行方法的超时时间,单位:毫秒
*
* @return
*/
long completedTimeoutMilliseconds() default Long.MAX_VALUE;
/**
* 自定义异常处理类
*
* @return
*/
String jobExceptionHandler() default "com.dangdang.ddframe.job.executor.handler.impl.DefaultJobExceptionHandler";
/**
* 自定义业务处理线程池
*
* @return
*/
String executorServiceHandler() default "com.dangdang.ddframe.job.executor.handler.impl.DefaultExecutorServiceHandler";
}
复制代码
这些配置均可以在官方文档中查到,这里只是整合起来方便用注解配置多个执行的任务 配置手册 固然也能够自定义一些属性,最后在初始化任务的时候去实现便可服务器
Map<String, Object> beanMap = ctx.getBeansWithAnnotation(ElasticJobConf.class);
for (Object confBean : beanMap.values()) {
Class<?> clz = confBean.getClass();
ElasticJobConf conf = AnnotationUtils.findAnnotation(clz, ElasticJobConf.class);
/**读取注解配置属性略 */
JobCoreConfiguration coreConfig = getJobCoreConfiguration(jobName, cron, shardingItemParameters, description, jobParameter, jobExceptionHandler, executorServiceHandler, failover, misfire, shardingTotalCount);
// 不一样类型的任务配置处理
JobTypeConfiguration typeConfig = getJobTypeConfiguration(jobTypeName, jobClass, scriptCommandLine, streamingProcess, coreConfig);
LiteJobConfiguration jobConfig = getLiteJobConfiguration(jobShardingStrategyClass, overwrite, disabled, monitorExecution, monitorPort, maxTimeDiffSeconds, reconcileIntervalMinutes, typeConfig);
List<BeanDefinition> elasticJobListeners = getTargetElasticJobListeners(conf);
// 构建SpringJobScheduler对象来初始化任务
BeanDefinitionBuilder factory = BeanDefinitionBuilder.rootBeanDefinition(SpringJobScheduler.class);
factory.setScope(BeanDefinition.SCOPE_PROTOTYPE);
if (jobTypeName.equals(JobType.SCRIPT)) {
factory.addConstructorArgValue(null);
} else {
factory.addConstructorArgValue(confBean);
}
factory.addConstructorArgValue(zookeeperRegistryCenter);
factory.addConstructorArgValue(jobConfig);
factory.addConstructorArgValue(elasticJobListeners);
DefaultListableBeanFactory defaultListableBeanFactory = (DefaultListableBeanFactory) ctx.getAutowireCapableBeanFactory();
defaultListableBeanFactory.registerBeanDefinition(jobName + "SpringJobScheduler", factory.getBeanDefinition());
SpringJobScheduler springJobScheduler = (SpringJobScheduler) ctx.getBean(jobName + "SpringJobScheduler");
springJobScheduler.init();
log.info("【" + jobName + "】\t" + jobClass + "\tinit success");
}
复制代码
这里主要是将注解配置的定时任务类注册到SpringJobScheduler并由它来统一的初始化各个定时任务,每一个定时任务的实例的管理则是由spring来控制markdown
@Slf4j
@Component
@ElasticJobConf(jobType = JobType.DATAFLOW, cron = "${elasticjob.jobs.dataflowJob.mark.cron}", name = "${elasticjob.jobs.dataflowJob.mark.jobName}",
shardingTotalCount = "${elasticjob.jobs.dataflowJob.mark.shardingTotalCount}", shardingItemParameters = "${elasticjob.jobs.dataflowJob.mark.shardingItemParameters}",disabled =
"${elasticjob.jobs.dataflowJob.mark.disabled}"
)
public class DemoDataflowJob implements DataflowJob<String> {
@Override
public List<String> fetchData(ShardingContext shardingContext) {
//拉取数据逻辑
}
@Override
public void processData(ShardingContext shardingContext, List<String> list) {
//数据处理逻辑
}
}
复制代码
其中shardingContext能够作任务的分片处理,具体详情可见config配置配置手册session