用Elastic-Job可解决分布式重复执行问题java
若是业务工程采用集群化的部署,可能会屡次重复执行定时任务而致使系统的业务逻辑错误,并产生系统故障。spring
job.propertiesapi
simple.id=recommendJob simple.class=com.tf56.mk.job.RecommendJob #天天23点执行 simple.cron=0 0 23 * * ? * ##0 0 23 * * ? * ##0 0/1 * * * ? ## 分片,会根据分片数配置多线程处理。在批量数据处理时,分片处理会提升效率。 simple.shardingTotalCount=1 ## 分片参数配置,使得分片更易于理解,某些时候可易于做业处理。 simple.shardingItemParameters=0=Beijing ## 是否监控做业运行状态。在任务执行时间短、任务间隔时间短等状况下,不建议开启,多少会影响效率。 simple.monitorExecution=true ## 失效转移。在多服务器时比较有用。 simple.failover=true ## 做业描述 simple.description=MK简单做业 ## 做业是否禁止启动。可用于部署做业时,先禁止启动,部署结束后统一启动。 simple.disabled=false ## 本地配置是否可覆盖注册中心配置。若是可覆盖,每次启动做业都以本地配置为准 simple.overwrite=true ## 做业监控端口。建议开启,方便dump做业信息,排查问题,特别是分布式做业状况下。 simple.monitorPort=9888 ## 做业前置、后置处理监听器。有点像是拦截器的概念。 listener.simple=com.tf56.mk.job.listener.SimpleListener simple1.id=projectJob simple1.class=com.tf56.mk.job.ProjectJob ##0 0 1 1/1 * ? 每日1点执行 ##0 0/5 * * * ? 每5分钟执行 simple1.cron=0 0/5 * * * ?
job.xml服务器
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xmlns:reg="http://www.dangdang.com/schema/ddframe/reg" xmlns:job="http://www.dangdang.com/schema/ddframe/job" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd http://www.dangdang.com/schema/ddframe/reg http://www.dangdang.com/schema/ddframe/reg/reg.xsd http://www.dangdang.com/schema/ddframe/job http://www.dangdang.com/schema/ddframe/job/job.xsd "> <context:property-placeholder location="classpath:config/*.properties" /> <reg:zookeeper id="regCenter" server-lists="${serverLists}" namespace="${namespace}" base-sleep-time-milliseconds="${baseSleepTimeMilliseconds}" max-sleep-time-milliseconds="${maxSleepTimeMilliseconds}" max-retries="${maxRetries}" /> <job:simple id="${simple.id}" class="${simple.class}" registry-center-ref="regCenter" sharding-total-count="${simple.shardingTotalCount}" cron="${simple.cron}" sharding-item-parameters="${simple.shardingItemParameters}" monitor-execution="${simple.monitorExecution}" monitor-port="${simple.monitorPort}" failover="${simple.failover}" description="${simple.description}" disabled="${simple.disabled}" overwrite="${simple.overwrite}"> <job:listener class="${listener.simple}" /> <job:event-log /> </job:simple> <job:simple id="projectJob" class="${simple1.class}" registry-center-ref="regCenter" sharding-total-count="${simple.shardingTotalCount}" cron="${simple1.cron}" sharding-item-parameters="${simple.shardingItemParameters}" monitor-execution="${simple.monitorExecution}" monitor-port="${simple.monitorPort}" failover="${simple.failover}" description="${simple.description}" disabled="${simple.disabled}" overwrite="${simple.overwrite}"> <job:listener class="${listener.simple}" /> <job:event-log /> </job:simple> </beans>
reg.properites多线程
serverLists=mt-zookeeper-vip:2181 namespace=esjob-monkeyKingService baseSleepTimeMilliseconds=1000 maxSleepTimeMilliseconds=3000 maxRetries=3
package com.tf56.mk.job; import com.dangdang.ddframe.job.api.ShardingContext; import com.dangdang.ddframe.job.api.simple.SimpleJob; import com.tf56.mk.common.util.PropertiesUtil; import com.tf56.mk.dao.RequestRecommendDao; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import java.util.HashMap; import java.util.Map; @Component public class RecommendJob implements SimpleJob { private final Logger logger = LoggerFactory.getLogger(this.getClass()); @Autowired RequestRecommendDao requestRecommendDao; /** * * @param arg0 */ @Override public void execute(ShardingContext arg0) { logger.info("ES Job Doing--doRecommendJob"); Map map = new HashMap(); map.put("mkExpertRecommendJobTime",Long.parseLong(PropertiesUtil.getPropertieValue("mkExpertRecommendJobTime"))); map.put("mkExpertRecommendNum",Long.parseLong(PropertiesUtil.getPropertieValue("mkExpertRecommendNum"))); requestRecommendDao.doRecommendJob(map); } }
package com.tf56.mk.job.listener; import com.dangdang.ddframe.job.executor.ShardingContexts; import com.dangdang.ddframe.job.lite.api.listener.ElasticJobListener; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class SimpleListener implements ElasticJobListener { private static Logger logger = LoggerFactory.getLogger(SimpleListener.class); public void beforeJobExecuted(final ShardingContexts shardingContexts) { logger.info("beforeJobExecuted:" + shardingContexts.getJobName()); } public void afterJobExecuted(final ShardingContexts shardingContexts) { logger.info("afterJobExecuted:" + shardingContexts.getJobName()); } }
pom.xmleclipse
<properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <java.version>1.7</java.version> <elastic-job.version>2.0.1</elastic-job.version> </properties>
<!-- esjob --> <dependency> <groupId>com.dangdang</groupId> <artifactId>elastic-job-lite-core</artifactId> <version>${elastic-job.version}</version> <exclusions> <exclusion> <groupId>org.eclipse.jetty.orbit</groupId> <artifactId>javax.annotation</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>com.dangdang</groupId> <artifactId>elastic-job-lite-spring</artifactId> <version>${elastic-job.version}</version> </dependency>
JobConfig.java 引入xml配置分布式
@Configuration @ImportResource(locations = {"classpath:spring/job.xml"}) public class JobConfig { }