使用quartz或者spring-task实现任务调度在集群环境下有什么问题?
答:html
一、不敢轻易跟着应用服务多节点部署,可能会重复屡次执行而引起系统逻辑的错误web
二、quartz的集群基于数据库的状态标记,涉及的表多达十多张,节点数量的增长并不能给咱们的每次执行效率带来提高,即不能实现水平扩展。算法
一、一致性(保证定时任务单独执行)spring
二、高可用性 (当前定时任务因外部缘由执行失败后有替代方案)sql
三、定时任务分布式处理(并行处理)数据库
而这些问题已经有前辈都考虑到了,提供了不少的解决方案apache
TBSchedule:由淘宝开源的分布式调度解决方案,目前好像没有维护了,文档较少api
Elastic-Job:由当当网开源的分布式调度解决方案,文档丰富友好,上手简单浏览器
Elastic-Job官网地址:http://elasticjob.io/index_zh.html安全
Elastic-Job是一个分布式调度解决方案,由两个相互独立的子项目Elastic-Job-Lite和Elastic-Job-Cloud组成。
Elastic-Job-Lite定位为轻量级无中心化解决方案,使用jar包的形式提供分布式任务的协调服务。
Elastic-Job-Cloud使用Mesos + Docker(TBD)的解决方案,额外提供资源治理、应用分发以及进程隔离等服务,Elastic-Job-Lite和Elastic-Job-Cloud提供同一套API开发做业,开发者仅需一次开发,便可根据须要以Lite或Cloud的方式部署
分布式调度 Elastic-Job-Lite并没有做业调度中心节点,而是基于部署做业框架的程序在到达相应时间点时各自触发调度。 注册中心仅用于做业注册和监控信息存储。而主做业节点仅用于处理分片和清理等功能
做业高可用 Elastic-Job-Lite提供最安全的方式执行做业。将分片总数设置为1,并使用多于1台的服务器执行做业,做业将会以1主n从的方式执行。 一旦执行做业的服务器崩溃,等待执行的服务器将会在下次做业启动时替补执行。开启失效转移功能效果更好,能够保证在本次做业执行时崩溃,备机当即启动替补执行。
最大限度利用资源 Elastic-Job-Lite也提供最灵活的方式,最大限度的提升执行做业的吞吐量。将分片项设置为大于服务器的数量,最好是大于服务器倍数的数量,做业将会合理的利用分布式资源,动态的分配分片项。 例如:3台服务器,分红10片,则分片项分配结果为服务器A=0,1,2;服务器B=3,4,5;服务器C=6,7,8,9。 若是服务器C崩溃,则分片项分配结果为服务器A=0,1,2,3,4;服务器B=5,6,7,8,9。在不丢失分片项的状况下,最大限度的利用现有资源提升吞吐量。
运维平台:提供web控制台用于管理做业
请先安装3.4.6版本以上的zookeeper
一、maven添加jar依赖
<!-- elastic-job start --> <dependency> <groupId>com.dangdang</groupId> <artifactId>elastic-job-lite-core</artifactId> <version>2.0.0</version> </dependency> <dependency> <groupId>com.dangdang</groupId> <artifactId>elastic-job-lite-spring</artifactId> <version>2.0.0</version> </dependency> <!-- elastic-job end --> <!-- zookeeper --> <dependency> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> <version>3.4.9</version> </dependency>
二、elastic-job与spring整合配置:
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:reg="http://www.dangdang.com/schema/ddframe/reg" xmlns:job="http://www.dangdang.com/schema/ddframe/job" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.dangdang.com/schema/ddframe/reg http://www.dangdang.com/schema/ddframe/reg/reg.xsd http://www.dangdang.com/schema/ddframe/job http://www.dangdang.com/schema/ddframe/job/job.xsd"> <!--配置做业注册中心 --> <reg:zookeeper id="regCenter" server-lists="172.18.7.133:2181" namespace="dd-job" base-sleep-time-milliseconds="1000" max-sleep-time-milliseconds="3000" max-retries="3" /> <!-- 配置做业--> <job:simple id="mySimpleJob" class="com.muheda.sipm.task.TestTask" registry-center-ref="regCenter" sharding-total-count="1" cron="0 0/5 * * * ?" overwrite="true" failover="true"/> </beans>
server-lists:zookeeper地址
sharding-total-count为分片数量,设置为1的时候,做业只会被分配到第一台机器进行处理;官网建议此值应为物理机的倍数
①分片策略:
1)、基于平均分配算法的分片策略,也是默认的分片策略; 若是分片不能整除,则不能整除的多余分片将依次追加到序号小的服务器
2)、根据做业名的哈希值奇偶数决定IP升降序算法的分片策略
3)、根据做业名的哈希值对服务器列表进行轮转的分片策略
②分片后台处理:
若是咱们的数据库只有一个,而物理机设置有多台集群,怎么使用分片查询功能?这里有一个处理办法是,第三步中的定时任务业务类中能够经过shardingContext.getShardingTotalCount()和shardingContext.getShardingItem()取到总的分片数量和当前分片编号,咱们在执行业务查询的时候能够在sql后面这样处理:
shardingContext.getShardingItem() = Id%shardingContext.getShardingTotalCount()
三、定时任务业务类:
package com.muheda.sipm.task; import com.dangdang.ddframe.job.api.ShardingContext; import com.dangdang.ddframe.job.api.simple.SimpleJob; import org.apache.log4j.Logger; /** * @Author: Sorin * @Descriptions: * @Date: Created in 2018/2/1 */ public class TestTask implements SimpleJob { private static Logger logger = Logger.getLogger(SimpleJob.class); @Override public void execute(ShardingContext shardingContext) { logger.info(String.format("ShardingItem: %s | Thread: %s | %s", shardingContext.getShardingItem(), Thread.currentThread().getId(), "SIMPLE")); logger.info("定时任务测试"); } }
四、使用不依赖与Spring的方式:
LoadElasticJob在项目启动后加载一次,里面加载job
@Component public class LoadElasticJob implements InitializingBean { private Logger logger = Logger.getLogger(this.getClass()); static int initBoolean = 0; @Override public void afterPropertiesSet() throws Exception { if(initBoolean==0){ init(); } initBoolean++; } public void init(){ logger.info("定时任务开始启动!"); CoordinatorRegistryCenter regCenter = ElasticJobUtils.setUpRegistryCenter(); // 店铺统计定时任务 ElasticJobUtils.jobShopStat(regCenter); logger.info("定时任务启动成功!"); } }
ElasticJobUtils中定义具体的job属性,至关于以前的xml中的bean.业务类是同样的,这里就不贴出来了
public class ElasticJobUtils { private static final String ZOOKEEPER_URL = PropertiesUtil.getValue("ZOOKEEPER_URL"); private static final String JOB_NAMESPACE = PropertiesUtil.getValue("JOB_NAMESPACE"); public static CoordinatorRegistryCenter setUpRegistryCenter() { ZookeeperConfiguration zkConfig = new ZookeeperConfiguration(ZOOKEEPER_URL, JOB_NAMESPACE); CoordinatorRegistryCenter result = new ZookeeperRegistryCenter(zkConfig); result.init(); return result; } /** * @Descripton: 店铺统计定时任务 * @Author: Sorin * @param regCenter * @param jobEventConfig * @Date: 2018/3/19 */ public static void jobShopStat(final CoordinatorRegistryCenter regCenter){ JobCoreConfiguration coreConfig = JobCoreConfiguration.newBuilder(PropertiesUtil.getValue("job_shop_stat"), PropertiesUtil.getValue("core_shop_stat"), 1).build(); SimpleJobConfiguration simpleJobConfig = new SimpleJobConfiguration(coreConfig, StatManageAction.class.getCanonicalName()); new JobScheduler(regCenter, LiteJobConfiguration.newBuilder(simpleJobConfig).build()).init(); } }
解压缩elastic-job-lite-console-${version}.tar.gz并执行bin\start.sh。 打开浏览器访问 http://localhost:8899/ 便可访问控制台