1. ElasticJob 是什么java
ElasticJob 是一个分布式调度解决方案,由两个相互独立的子项目 ElasticJob-Lite 和 ElasticJob-Cloud 组成。 web
ElasticJob-Lite 定位为轻量级无中心化解决方案,使用jar的形式提供分布式任务的协调服务。spring
ElasticJob 已于2020年5月28日成为 Apache ShardingSphere 的子项目。 apache
ElasticJob特性:api
2. 实例演示服务器
这里采用最新版本 3.0.0-RC1 session
一、启动zookeeper服务app
首先,下载zookeeper-3.6.0版本,解压后复制一份zoo_sample.cfg,重命名未zoo.cfg,保持默认配置便可socket
注意,zookeeper-3.6.0启动之后会占用三个端口,其中包括8080哦maven
二、编写定时任务业务逻辑
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.4.1</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.example</groupId>
<artifactId>elasticjob-demo</artifactId>
<version>0.0.1-SNAPSHOT</version>
<properties>
<java.version>1.8</java.version>
<elasticjob-lite.version>3.0.0-RC1</elasticjob-lite.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency> <groupId>org.apache.shardingsphere.elasticjob</groupId> <artifactId>elasticjob-lite-spring-boot-starter</artifactId> <version>${elasticjob-lite.version}</version> <exclusions> <exclusion> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-jdbc</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.apache.shardingsphere.elasticjob</groupId> <artifactId>elasticjob-error-handler-dingtalk</artifactId> <version>${elasticjob-lite.version}</version> </dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
application.yml
elasticjob:
regCenter:
serverLists: 192.168.100.15:2181
namespace: elasticjob-demo
baseSleepTimeMilliseconds: 2000
maxSleepTimeMilliseconds: 4000
maxRetries: 3
jobs:
firstJob:
elasticJobClass: com.example.job.FirstJob
cron: 0/6 * * * * ?
shardingTotalCount: 3
jobErrorHandlerType: DINGTALK
props:
dingtalk:
webhook: https://oapi.dingtalk.com/robot/send?access_token=xxx
secret: ASDF
connectTimeout: 3000
readTimeout: 5000
secondJob:
elasticJobClass: com.example.job.SecondJob
cron: 0/10 * * * * ?
shardingTotalCount: 1
jobErrorHandlerType: DINGTALK
props:
dingtalk:
webhook: https://oapi.dingtalk.com/robot/send?access_token=xxx
secret: ASDF
connectTimeout: 3000
readTimeout: 5000
两个定时任务
FirstJob.java
package com.example.job;
import org.apache.shardingsphere.elasticjob.api.ShardingContext;
import org.apache.shardingsphere.elasticjob.simple.job.SimpleJob;
import org.springframework.stereotype.Component;
/**
* @author ChengJianSheng
* @date 2021/1/13
*/
@Component
public class FirstJob implements SimpleJob {
@Override
public void execute(ShardingContext shardingContext) {
switch (shardingContext.getShardingItem()) {
case 0:
// do something by sharding item 0
System.out.println(0);
// int a = 1 / 0;
break;
case 1:
// do something by sharding item 1
System.out.println(1);
break;
case 2:
// do something by sharding item 2
System.out.println(2);
break;
// case n: ...
}
}
}
SecondJob.java
package com.example.job;
import org.apache.shardingsphere.elasticjob.api.ShardingContext;
import org.apache.shardingsphere.elasticjob.simple.job.SimpleJob;
import org.springframework.stereotype.Component;
/**
* @author ChengJianSheng
* @date 2021/1/18
*/
@Component
public class SecondJob implements SimpleJob {
@Override
public void execute(ShardingContext shardingContext) {
System.out.println("hello");
}
}
项目结构
运行项目便可
经过 ElasticJob-UI 查看任务
https://shardingsphere.apache.org/elasticjob/current/cn/downloads/
3. 启动报错排查
项目启动过程当中,可能会报以下错误
org.apache.zookeeper.ClientCnxn$EndOfStreamException: Unable to read additional data from server sessionid 0x1000bdf48160002, likely server has closed socket
org.apache.shardingsphere.elasticjob.reg.exception.RegException: org.apache.zookeeper.KeeperException$OperationTimeoutException: KeeperErrorCode = OperationTimeout
Caused by: org.apache.zookeeper.KeeperException$OperationTimeoutException: KeeperErrorCode = OperationTimeout
最开始,我觉得是zookeeper版本的问题,后来换了版本也不行,防火墙关了也不行
而后,我怀疑是开发环境问题,因而在本地运行zookeeper,程序连127.0.0.1:2181,竟然能够了
因而我陷入了沉思,为今之计,只剩下一个办法了,打断点调试
找到了异常抛出的位置,以下图
baseSleepTimeMilliseconds 表示 等待重试的间隔时间的初始值
maxSleepTimeMilliseconds 表示 等待重试的间隔时间的最大值
maxRetries 表示 最大重试次数
根据代码中意思,若是在 maxSleepTimeMilliseconds * maxRetries 毫秒内尚未链接成功,则链接关闭,并抛出操做超时异常
联想到,链接本地zookeeper能够,连开发环境zk就不行,再加上观察日志从链接开始到抛异常的时间间隔,我猜到应该是maxSleepTimeMilliseconds设置过短了
因而,application.yml配置文件中将maxSleepTimeMilliseconds设置为4000,baseSleepTimeMilliseconds设置为2000
而后好使
回想刚开始报的那些错,其实根本就尚未连上zookeeper
4. 做业分片
ElasticJob 中任务分片项的概念,使得任务能够在分布式的环境下运行,每台任务服务器只运行分配给该服务器的分片。 随着服务器的增长或宕机,ElasticJob 会近乎实时的感知服务器数量的变动,从而从新为分布式的任务服务器分配更加合理的任务分片项,使得任务能够随着资源的增长而提高效率。
任务的分布式执行,须要将一个任务拆分为多个独立的任务项,而后由分布式的服务器分别执行某一个或几个分片项。
也就是说,分片是为了在分布式环境下高效合理利用任务服务器资源的。简单地来说,一个定时任务,咱们运行多台服务器,这意味着有多个实例在执行同一项任务,分片就是为了告诉这些实例各自该处理那些数据,最大限度的下降数据重复处理的问题,同时加快任务处理速度。每一个任务实例该处理哪些数据,是根据分片项来的,在任务代码层面,就能够根据分片项来进行逻辑判断。
举例说明,若是做业分为 4 片,用两台服务器执行,则每一个服务器分到 2 片,分别负责做业的 50% 的负载
分片项
ElasticJob 并不直接提供数据处理的功能,而是将分片项分配至各个运行中的做业服务器,开发者须要自行处理分片项与业务的对应关系。 分片项为数字,始于 0 而终于分片总数减 1。
个性化分片参数
个性化参数能够和分片项匹配对应关系,用于将分片项的数字转换为更加可读的业务代码。
合理使用个性化参数可让代码更可读。例如,若是配置为 0=北京,1=上海,2=广州,那么代码中直接使用北京,上海,广州的枚举值便可完成分片项和业务逻辑的对应关系。
分片策略
平均分片策略
根据分片项平均分片。若是做业服务器数量与分片总数没法整除,多余的分片将会顺序的分配至每个做业服务器。
举例说明:
奇偶分片策略
根据做业名称哈希值的奇偶数决定按照做业服务器 IP 升序或是降序的方式分片。
若是做业名称哈希值是偶数,则按照 IP 地址进行升序分片; 若是做业名称哈希值是奇数,则按照 IP 地址进行降序分片。 可用于让服务器负载在多个做业共同运行时分配的更加均匀。
举例说明:
轮询分片策略
根据做业名称轮询分片。
5. 官方文档
https://shardingsphere.apache.org/elasticjob/current/cn/features/elastic/
https://shardingsphere.apache.org/elasticjob/current/cn/user-manual/elasticjob-lite/
https://shardingsphere.apache.org/elasticjob/current/cn/user-manual/elasticjob-lite/configuration/
https://shardingsphere.apache.org/elasticjob/current/cn/dev-manual/