根据官方文档实现了一个简单的simple类型(Elastic-Job提供Simple、Dataflow和Script 3种做业类型)的Demo。java
基本概念1. 分片概念任务的分布式执行,须要将一个任务拆分为多个独立的任务项,而后由分布式的服务器分别执行某一个或几个分片项。linux 例如:有一个遍历数据库某张表的做业,现有2台服务器。为了快速的执行做业,那么每台服务器应执行做业的50%。 为知足此需求,可将做业分红2片,每台服务器执行1片。做业遍历数据的逻辑应为:服务器A遍历ID以奇数结尾的数据;服务器B遍历ID以偶数结尾的数据。 若是分红10片,则做业遍历数据的逻辑应为:每片分到的分片项应为ID%10,而服务器A被分配到分片项0,1,2,3,4;服务器B被分配到分片项5,6,7,8,9,直接的结果就是服务器A遍历ID以0-4结尾的数据;服务器B遍历ID以5-9结尾的数据。数据库 2. 分片项与业务处理解耦Elastic-Job并不直接提供数据处理的功能,框架只会将分片项分配至各个运行中的做业服务器,开发者须要自行处理分片项与真实数据的对应关系。windows 3. 个性化参数的适用场景个性化参数即shardingItemParameter,能够和分片项匹配对应关系,用于将分片项的数字转换为更加可读的业务代码。api 例如:按照地区水平拆分数据库,数据库A是北京的数据;数据库B是上海的数据;数据库C是广州的数据。 若是仅按照分片项配置,开发者须要了解0表示北京;1表示上海;2表示广州。 合理使用个性化参数可让代码更可读,若是配置为0=北京,1=上海,2=广州,那么代码中直接使用北京,上海,广州的枚举值便可完成分片项和业务逻辑的对应关系。安全 核心理念1. 分布式调度Elastic-Job-Lite并没有做业调度中心节点,而是基于部署做业框架的程序在到达相应时间点时各自触发调度。服务器 注册中心仅用于做业注册和监控信息存储。而主做业节点仅用于处理分片和清理等功能。架构 2. 做业高可用Elastic-Job-Lite提供最安全的方式执行做业。将分片总数设置为1,并使用多于1台的服务器执行做业,做业将会以1主n从的方式执行。框架 一旦执行做业的服务器崩溃,等待执行的服务器将会在下次做业启动时替补执行。开启失效转移功能效果更好,能够保证在本次做业执行时崩溃,备机当即启动替补执行。分布式 3. 最大限度利用资源Elastic-Job-Lite也提供最灵活的方式,最大限度的提升执行做业的吞吐量。将分片项设置为大于服务器的数量,最好是大于服务器倍数的数量,做业将会合理的利用分布式资源,动态的分配分片项。 例如:3台服务器,分红10片,则分片项分配结果为服务器A=0,1,2;服务器B=3,4,5;服务器C=6,7,8,9。 若是服务器C崩溃,则分片项分配结果为服务器A=0,1,2,3,4;服务器B=5,6,7,8,9。在不丢失分片项的状况下,最大限度的利用现有资源提升吞吐量。 总体架构图 |
这里使用去中心化的版本elastic-job-lite。
JDK:1.8(elastic-job要1.7以上) zookeeper:zookeeper-3.4.12 (官方要求zookeeper-3.4.6以上) elastic-job-lite-core:2.0.3 |
<dependency> <groupId>com.dangdang</groupId> <artifactId>elastic-job-lite-core</artifactId> <version>2.0.3</version> </dependency>
由于我使用了slf4j因此还须要引入依赖:
<dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> <version>1.7.2</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> <version>1.7.2</version> </dependency>
我写了两个类一个是任务做业的实现类MyELasticJob,很是简单,这里只是写了一些日志的打印:
package com.elasticjob.demo; import com.dangdang.ddframe.job.api.ShardingContext; import com.dangdang.ddframe.job.api.simple.SimpleJob; import com.dangdang.ddframe.job.config.JobCoreConfiguration; import com.dangdang.ddframe.job.config.simple.SimpleJobConfiguration; import com.dangdang.ddframe.job.lite.api.JobScheduler; import com.dangdang.ddframe.job.lite.config.LiteJobConfiguration; import com.dangdang.ddframe.job.reg.base.CoordinatorRegistryCenter; import com.dangdang.ddframe.job.reg.zookeeper.ZookeeperConfiguration; import com.dangdang.ddframe.job.reg.zookeeper.ZookeeperRegistryCenter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.Date; /** * @Description: 做业实现类 * @Author: 张颖辉(yh) * @CreateDate: 2018/7/20 14:01 * @UpdateUser: 张颖辉(yh) * @UpdateDate: 2018/7/20 14:01 * @UpdateRemark: The modified content * @Version: 1.0 */ public class MyELasticJob implements SimpleJob { private Logger logger = LoggerFactory.getLogger(getClass()); /** * @Description: 要执行的做业(分片) * @Author: 张颖辉(yh) * @Date: 2018/7/23 14:42 * @param: [shardingContext] * @return: void * @Version: 1.0 */ @Override public void execute(ShardingContext shardingContext) { //任务分片 switch (shardingContext.getShardingItem()) { case 0: logger.info("job名称={},分片数量={},当前分片={},当前分片名称={},当前自定义参数={} -----------", shardingContext.getJobName(), shardingContext.getShardingTotalCount(), shardingContext.getShardingItem(), shardingContext.getShardingParameter(), shardingContext.getJobParameter()); break; case 1: logger.info("job名称={},分片数量={},当前分片={},当前分片名称={},当前自定义参数={} -----------", shardingContext.getJobName(), shardingContext.getShardingTotalCount(), shardingContext.getShardingItem(), shardingContext.getShardingParameter(), shardingContext.getJobParameter()); break; case 2: logger.info("job名称={},分片数量={},当前分片={},当前分片名称={},当前自定义参数={} -----------", shardingContext.getJobName(), shardingContext.getShardingTotalCount(), shardingContext.getShardingItem(), shardingContext.getShardingParameter(), shardingContext.getJobParameter()); break; } } }
另一个就是关于任务做业的设置和注册中心的设置以及任务的启动:
package com.elasticjob.demo; import com.dangdang.ddframe.job.config.JobCoreConfiguration; import com.dangdang.ddframe.job.config.JobRootConfiguration; import com.dangdang.ddframe.job.config.simple.SimpleJobConfiguration; import com.dangdang.ddframe.job.lite.api.JobScheduler; import com.dangdang.ddframe.job.lite.config.LiteJobConfiguration; import com.dangdang.ddframe.job.reg.base.CoordinatorRegistryCenter; import com.dangdang.ddframe.job.reg.zookeeper.ZookeeperConfiguration; import com.dangdang.ddframe.job.reg.zookeeper.ZookeeperRegistryCenter; /** * @Description: Description * @Author: 张颖辉(yh) * @CreateDate: 2018/7/23 11:30 * @UpdateUser: 张颖辉(yh) * @UpdateDate: 2018/7/23 11:30 * @UpdateRemark: The modified content * @Version: 1.0 */ public class TestJob { public static void main(String[] args) { //做业参数 String jobParameter = "做业节点-linux"; if (isWindows()) { jobParameter = "做业节点-windows"; } new JobScheduler(createRegistryCenter(), createJobConfiguration(jobParameter)).init(); } /** * @Description: 建立注册中心 * @Author: 张颖辉(yh) * @Date: 2018/7/20 15:09 * @param: [] * @return: com.dangdang.ddframe.job.reg.base.CoordinatorRegistryCenter * @Version: 1.0 */ private static CoordinatorRegistryCenter createRegistryCenter() { CoordinatorRegistryCenter registryCenter = new ZookeeperRegistryCenter(new ZookeeperConfiguration("192.168.5.100:2181", "elastic-job-demo")); registryCenter.init(); return registryCenter; } /** * @Description: 建立做业配置 * @Author: 张颖辉(yh) * @Date: 2018/7/20 15:08 * @param: * @return: * @Version: 1.0 */ private static LiteJobConfiguration createJobConfiguration(String jobParameter) { // demoSimpleJob 为jobname, 0/10 * * * * ?为cron表达式, 3 分片数量, 0=北京,1=上海,2=广州 分片对应参数内容, jobParameter 做业自定义参数 // 定义做业核心配置 JobCoreConfiguration jobCoreConfiguration = JobCoreConfiguration.newBuilder("demoSimpleJob_3p", "0/10 * * * * ?", 3).shardingItemParameters("0=北京,1=上海,2=广州").jobParameter(jobParameter).build(); // 定义SIMPLE类型配置 SimpleJobConfiguration simpleJobConfiguration = new SimpleJobConfiguration(jobCoreConfiguration, MyELasticJob.class.getCanonicalName()); // 定义Lite做业根配置 LiteJobConfiguration liteJobConfiguration = LiteJobConfiguration.newBuilder(simpleJobConfiguration).build(); return liteJobConfiguration; } /** * @Description: 是否是windows * @Author: 张颖辉(yh) * @Date: 2018/7/23 15:15 * @param: * @return: * @Version: 1.0 */ private static boolean isWindows() { return System.getProperties().getProperty("os.name").toUpperCase().indexOf("WINDOWS") != -1; } }
关于做业的分片数量要与做业类中的execute方法里分支相匹配。
而后我在虚拟机下linux中安装了注册中心zookeeper(对应上面代码中指定的zookeeper配置)安装教程
将上面的两个类打成jar,上传到linux中执行,本地windows也执行一份。就会发现三个任务分片有两个在window下执行,
剩下的一个在linux中执行了。
注意:须要两台不一样ip的机器去启动job(做业服务器) 由于ElasticJob默认的分片机制是根据ip来分片的 若是ip相同 它会默认为一台服务器 。因此这里我把代码上传到linux上执行一份。