分布式任务调度框架,结合zookeeper技术解决quartz框架在分布式系统中重复的定时任务致使的不可预见的错误java
pommysql
<dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>druid-spring-boot-starter</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-jdbc</artifactId> </dependency> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> </dependency> <dependency> <groupId>com.github.kuhn-he</groupId> <artifactId>elastic-job-lite-spring-boot-starter</artifactId> <version>2.1.5</version> </dependency> </dependencies>
application.ymlgit
# zk配置 elaticjob: zookeeper: server-lists: localhost:2181 namespace: elastic-job-demo
# 数据源配置
省略
SimpleJobgithub
@ElasticSimpleJob(cron = "0/10 * * * * ?", jobName = "test123", shardingTotalCount = 3, jobParameter = "测试参数", shardingItemParameters = "0=Beijing,1=Shanghai,2=Guangzhou") @Component public class MySimpleJob implements SimpleJob { private static final Logger logger = LoggerFactory.getLogger(MySimpleJob.class); @Override public void execute(ShardingContext shardingContext) { logger.info(String.format("------Thread ID: %s, 任务总片数: %s, " + "当前分片项: %s,当前参数: %s," + "当前任务名称: %s,当前任务参数: %s,"+ "当前任务的id: %s", //获取当前线程的id Thread.currentThread().getId(), //获取任务总片数 shardingContext.getShardingTotalCount(), //获取当前分片项 shardingContext.getShardingItem(), //获取当前的参数 shardingContext.getShardingParameter(), //获取当前的任务名称 shardingContext.getJobName(), //获取当前任务参数 shardingContext.getJobParameter(), //获取任务的id shardingContext.getTaskId() )); } }
默认使用:基于平均分配算法的分片策略web
/* * Copyright 1999-2015 dangdang.com. * <p> * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. * </p> */ package io.elasticjob.lite.api.strategy.impl; import io.elasticjob.lite.api.strategy.JobInstance; import io.elasticjob.lite.api.strategy.JobShardingStrategy; import java.util.ArrayList; import java.util.Collections; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; /** * 基于平均分配算法的分片策略. * * <p> * 若是分片不能整除, 则不能整除的多余分片将依次追加到序号小的服务器. * 如: * 1. 若是有3台服务器, 分红9片, 则每台服务器分到的分片是: 1=[0,1,2], 2=[3,4,5], 3=[6,7,8]. * 2. 若是有3台服务器, 分红8片, 则每台服务器分到的分片是: 1=[0,1,6], 2=[2,3,7], 3=[4,5]. * 3. 若是有3台服务器, 分红10片, 则每台服务器分到的分片是: 1=[0,1,2,9], 2=[3,4,5], 3=[6,7,8]. * </p> * * @author zhangliang */ public final class AverageAllocationJobShardingStrategy implements JobShardingStrategy { @Override public Map<JobInstance, List<Integer>> sharding(final List<JobInstance> jobInstances, final String jobName, final int shardingTotalCount) { if (jobInstances.isEmpty()) { return Collections.emptyMap(); } Map<JobInstance, List<Integer>> result = shardingAliquot(jobInstances, shardingTotalCount); addAliquant(jobInstances, shardingTotalCount, result); return result; } private Map<JobInstance, List<Integer>> shardingAliquot(final List<JobInstance> shardingUnits, final int shardingTotalCount) { Map<JobInstance, List<Integer>> result = new LinkedHashMap<>(shardingTotalCount, 1); int itemCountPerSharding = shardingTotalCount / shardingUnits.size(); int count = 0; for (JobInstance each : shardingUnits) { List<Integer> shardingItems = new ArrayList<>(itemCountPerSharding + 1); for (int i = count * itemCountPerSharding; i < (count + 1) * itemCountPerSharding; i++) { shardingItems.add(i); } result.put(each, shardingItems); count++; } return result; } private void addAliquant(final List<JobInstance> shardingUnits, final int shardingTotalCount, final Map<JobInstance, List<Integer>> shardingResults) { int aliquant = shardingTotalCount % shardingUnits.size(); int count = 0; for (Map.Entry<JobInstance, List<Integer>> entry : shardingResults.entrySet()) { if (count < aliquant) { entry.getValue().add(shardingTotalCount / shardingUnits.size() * shardingUnits.size() + count); } count++; } } }
第一步:去下载包
https://github.com/miguangying/elastic-job-lite-console#elastic-job-lite-console算法
第二步:解压缩
解压缩elastic-job-lite-console-${version}.tar.gz并执行bin\start.sh,windows平台执行start.bat。打开浏览器访问http://localhost:8899/便可访问控制台。spring
8899为默认端口号,可经过启动脚本输入-p自定义端口号。elastic-job-lite-console-${version}.tar.gz可经过mvn install编译获取。sql
第三步:登陆
提供两种帐户,管理员及访客,管理员拥有所有操做权限,访客仅拥有察看权限。默认管理员用户名和密码是root/root,访客用户名和密码是guest/guest,可经过conf\auth.properties修改管理员及访客用户名及密码。express