Elastic-Job定时任务

用Elastic-Job可解决分布式重复执行问题java

若是业务工程采用集群化的部署,可能会屡次重复执行定时任务而致使系统的业务逻辑错误,并产生系统故障。spring

job.propertiesapi

simple.id=recommendJob
simple.class=com.tf56.mk.job.RecommendJob
#天天23点执行
simple.cron=0 0 23 * * ? *
##0 0 23 * * ? *
##0 0/1 * * * ?
## 分片,会根据分片数配置多线程处理。在批量数据处理时,分片处理会提升效率。
simple.shardingTotalCount=1
## 分片参数配置,使得分片更易于理解,某些时候可易于做业处理。
simple.shardingItemParameters=0=Beijing
## 是否监控做业运行状态。在任务执行时间短、任务间隔时间短等状况下,不建议开启,多少会影响效率。
simple.monitorExecution=true
## 失效转移。在多服务器时比较有用。
simple.failover=true
## 做业描述
simple.description=MK简单做业
## 做业是否禁止启动。可用于部署做业时,先禁止启动,部署结束后统一启动。
simple.disabled=false
## 本地配置是否可覆盖注册中心配置。若是可覆盖,每次启动做业都以本地配置为准
simple.overwrite=true
## 做业监控端口。建议开启,方便dump做业信息,排查问题,特别是分布式做业状况下。
simple.monitorPort=9888
## 做业前置、后置处理监听器。有点像是拦截器的概念。
listener.simple=com.tf56.mk.job.listener.SimpleListener


simple1.id=projectJob
simple1.class=com.tf56.mk.job.ProjectJob
##0 0 1 1/1 * ?  每日1点执行
##0 0/5 * * * ?  每5分钟执行
simple1.cron=0 0/5 * * * ?

job.xml服务器

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context"
       xmlns:reg="http://www.dangdang.com/schema/ddframe/reg" xmlns:job="http://www.dangdang.com/schema/ddframe/job"
       xsi:schemaLocation="http://www.springframework.org/schema/beans
                        http://www.springframework.org/schema/beans/spring-beans.xsd
                        http://www.springframework.org/schema/context
                        http://www.springframework.org/schema/context/spring-context.xsd
                        http://www.dangdang.com/schema/ddframe/reg
                        http://www.dangdang.com/schema/ddframe/reg/reg.xsd
                        http://www.dangdang.com/schema/ddframe/job
                        http://www.dangdang.com/schema/ddframe/job/job.xsd
                        ">
    <context:property-placeholder location="classpath:config/*.properties" />

    <reg:zookeeper id="regCenter" server-lists="${serverLists}"
                   namespace="${namespace}" base-sleep-time-milliseconds="${baseSleepTimeMilliseconds}"
                   max-sleep-time-milliseconds="${maxSleepTimeMilliseconds}" max-retries="${maxRetries}" />

    <job:simple id="${simple.id}" class="${simple.class}"
                registry-center-ref="regCenter" sharding-total-count="${simple.shardingTotalCount}"
                cron="${simple.cron}" sharding-item-parameters="${simple.shardingItemParameters}"
                monitor-execution="${simple.monitorExecution}" monitor-port="${simple.monitorPort}"
                failover="${simple.failover}" description="${simple.description}"
                disabled="${simple.disabled}" overwrite="${simple.overwrite}">
        <job:listener class="${listener.simple}" />
        <job:event-log />
    </job:simple>

    <job:simple id="projectJob" class="${simple1.class}"
                registry-center-ref="regCenter" sharding-total-count="${simple.shardingTotalCount}"
                cron="${simple1.cron}" sharding-item-parameters="${simple.shardingItemParameters}"
                monitor-execution="${simple.monitorExecution}" monitor-port="${simple.monitorPort}"
                failover="${simple.failover}" description="${simple.description}"
                disabled="${simple.disabled}" overwrite="${simple.overwrite}">
        <job:listener class="${listener.simple}" />
        <job:event-log />
    </job:simple>

</beans>

reg.properites多线程

serverLists=mt-zookeeper-vip:2181
namespace=esjob-monkeyKingService
baseSleepTimeMilliseconds=1000
maxSleepTimeMilliseconds=3000
maxRetries=3

 

package com.tf56.mk.job;

import com.dangdang.ddframe.job.api.ShardingContext;
import com.dangdang.ddframe.job.api.simple.SimpleJob;
import com.tf56.mk.common.util.PropertiesUtil;
import com.tf56.mk.dao.RequestRecommendDao;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.util.HashMap;
import java.util.Map;

@Component
public class RecommendJob implements SimpleJob {

    private final Logger logger = LoggerFactory.getLogger(this.getClass());

    @Autowired
    RequestRecommendDao requestRecommendDao;

    /**
     * 
     * @param arg0
     */
    @Override
    public void execute(ShardingContext arg0) {
        logger.info("ES Job Doing--doRecommendJob");
        Map map = new HashMap();
        map.put("mkExpertRecommendJobTime",Long.parseLong(PropertiesUtil.getPropertieValue("mkExpertRecommendJobTime")));
        map.put("mkExpertRecommendNum",Long.parseLong(PropertiesUtil.getPropertieValue("mkExpertRecommendNum")));
        requestRecommendDao.doRecommendJob(map);
    }
}

 

package com.tf56.mk.job.listener;

import com.dangdang.ddframe.job.executor.ShardingContexts;
import com.dangdang.ddframe.job.lite.api.listener.ElasticJobListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SimpleListener implements ElasticJobListener {
    private static Logger logger = LoggerFactory.getLogger(SimpleListener.class);

    public void beforeJobExecuted(final ShardingContexts shardingContexts) {
        logger.info("beforeJobExecuted:" + shardingContexts.getJobName());
    }

    public void afterJobExecuted(final ShardingContexts shardingContexts) {
        logger.info("afterJobExecuted:" + shardingContexts.getJobName());
    }
}

pom.xmleclipse

<properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <java.version>1.7</java.version>
    <elastic-job.version>2.0.1</elastic-job.version>
</properties>
<!-- esjob -->
<dependency>
    <groupId>com.dangdang</groupId>
    <artifactId>elastic-job-lite-core</artifactId>
    <version>${elastic-job.version}</version>
    <exclusions>
        <exclusion>
            <groupId>org.eclipse.jetty.orbit</groupId>
            <artifactId>javax.annotation</artifactId>
        </exclusion>
    </exclusions>
</dependency>

<dependency>
    <groupId>com.dangdang</groupId>
    <artifactId>elastic-job-lite-spring</artifactId>
    <version>${elastic-job.version}</version>
</dependency>

JobConfig.java 引入xml配置分布式

@Configuration
@ImportResource(locations = {"classpath:spring/job.xml"})
public class JobConfig {
}
相关文章
相关标签/搜索