Elastic-job-lite 2.1.3 代码详解java
Elastic-Job是一个分布式调度解决方案,由两个独立的子项目Elastic-Job-Lite和Elastic-Job-Cloud组成。node
Elastic-Job-Lite: 轻量级无中心化解决方案,使用jar包的形式提供分布式任务的协调服务。spring
Elastic-Job-Cloud: Mesos + Docker 的解决方案,额外提供资源治理、应用分发以及进程隔离等服务。数据库
Elastic-job并没有做业调度中心节点,而是基于部署做业Quartz的程序在到达相应时间点时各自触发调度。zookeeper用于做业注册、信息存储、任务执行过程的状态标记等, 主做业实例在选举过程当中产生后用于做业分片的计算。apache
zookeeper上建立节点树,保存任务配置信息;各监听TreeCacheListener托管于"/${jobName}"的TreeCache对象的ListenerContainer中。当zk的节点树变化(add、remove、update...)TreeCache&TreeNode<implements org.apache.zookeeper.Watcher、api
org.apache.curator.framework.api.BackgroundCallback>处理watchedEvent的响应,TreeCache调用publishEvent方法异步唤醒全部TreeCacheListener。服务器
同时将当前TreeNode再次绑定为TreeCache的path监听:
client.checkExists().usingWatcher(this).inBackground(this).forPath(path);client.getData().usingWatcher(this).inBackground(this)).forPath(this.path);client.getChildren().usingWatcher(this).inBackground(this)).forPath(this.path);并发
印证:app
zookeeper在create、delete、setData、exists、getData、getACL、getChildren时都能定义AsyncCallback;但只有在 ZooKeeper构造、exists、getData、getChildren 能注册Watcher.框架
package com.dangdang.ddframe.job.lite.internal.listener; import com.google.common.base.Charsets; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.recipes.cache.ChildData; import org.apache.curator.framework.recipes.cache.TreeCacheEvent; import org.apache.curator.framework.recipes.cache.TreeCacheEvent.Type; import org.apache.curator.framework.recipes.cache.TreeCacheListener; /** * 做业注册中心的监听器. * * @author zhangliang */ public abstract class AbstractJobListener implements TreeCacheListener { @Override public final void childEvent(final CuratorFramework client, final TreeCacheEvent event) throws Exception { ChildData childData = event.getData(); if (null == childData) { return; } String path = childData.getPath(); if (path.isEmpty()) { return; } dataChanged(path, event.getType(), null == childData.getData() ? "" : new String(childData.getData(), Charsets.UTF_8)); } protected abstract void dataChanged(final String path, final Type eventType, final String data); }
TreeCache: private void publishEvent(final TreeCacheEvent event) { if (treeState.get() != TreeState.CLOSED) { LOG.debug("publishEvent: {}", event); executorService.submit(() -> { try { callListeners(event); } catch (Exception e) { ThreadUtils.checkInterrupted(e); handleException(e); } }); } } private void callListeners(final TreeCacheEvent event) { listeners.forEach(new Function<TreeCacheListener, Void>() { @Override public Void apply(TreeCacheListener listener) { try { listener.childEvent(client, event); } catch ( Exception e ) { ThreadUtils.checkInterrupted(e); handleException(e); } return null; } }); }
elastic-job-lite使用在 zookeeper-3.4.6.jar基础上进行封装curator框架(2.10.0) 来操做zookeeper节点。
构建项目时,使用curator的版本都应该一致:
<dependencies> <dependency> <groupId>com.dangdang</groupId> <artifactId>elastic-job-lite-core</artifactId> <version>2.1.3</version> </dependency> <dependency> <groupId>com.dangdang</groupId> <artifactId>elastic-job-lite-spring</artifactId> <version>2.1.3</version> </dependency> </dependencies>
<quartz.version>2.2.1</quartz.version> <curator.version>2.10.0</curator.version> <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-framework</artifactId> </dependency> <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-client</artifactId> </dependency> <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-recipes</artifactId> </dependency> <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-test</artifactId> </dependency> <dependency> <groupId>org.quartz-scheduler</groupId> <artifactId>quartz</artifactId> </dependency>
elastic-job-lite-spring包下spring.handlers、spring.schemas文件声明xml中命名空间和对应的标签。RegNamespaceHandler 、JobNamespaceHandler: extends NamespaceHandlerSupport
job-ref配置优先级大于class属性配置,在JobScheduler的createJobDetail方法中会断定LitJob类属性elasticJob实例的来源。
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:reg="http://www.dangdang.com/schema/ddframe/reg" xmlns:job="http://www.dangdang.com/schema/ddframe/job" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.dangdang.com/schema/ddframe/reg http://www.dangdang.com/schema/ddframe/reg/reg.xsd http://www.dangdang.com/schema/ddframe/job http://www.dangdang.com/schema/ddframe/job/job.xsd "> <!--配置做业注册中心 --> <reg:zookeeper id="regCenter" server-lists=" yourhost:2181" namespace="dd-job" base-sleep-time-milliseconds="1000" max-sleep-time-milliseconds="3000" max-retries="3" /> <!-- 配置简单做业 --> <job:simple id="simpleElasticJob" class="xxx.MySimpleElasticJob" registry-center-ref="regCenter" cron="0/10 * * * * ?" sharding-total-count="3" sharding-item-parameters="0=A,1=B,2=C" /> <bean id="yourRefJobBeanId" class="xxx.MySimpleRefElasticJob"> <property name="fooService" ref="xxx.FooService" /> </bean> <!-- 配置关联Bean做业 --> <job:simple id="simpleRefElasticJob" job-ref="yourRefJobBeanId" registry-center-ref="regCenter" cron="0/10 * * * * ?" sharding-total-count="3" sharding-item-parameters="0=A,1=B,2=C" /> <!-- 配置数据流做业 --> <job:dataflow id="throughputDataflow" class="xxx.MyThroughputDataflowElasticJob" registry-center-ref="regCenter" cron="0/10 * * * * ?" sharding-total-count="3" sharding-item-parameters="0=A,1=B,2=C" /> <!-- 配置脚本做业 --> <job:script id="scriptElasticJob" registry-center-ref="regCenter" cron="0/10 * * * * ?" sharding-total-count="3" sharding-item-parameters="0=A,1=B,2=C" script-command-line="/your/file/path/demo.sh" /> <!-- 配置带监听的简单做业 --> <job:simple id="listenerElasticJob" class="xxx.MySimpleListenerElasticJob" registry-center-ref="regCenter" cron="0/10 * * * * ?" sharding-total-count="3" sharding-item-parameters="0=A,1=B,2=C"> <job:listener class="xx.MySimpleJobListener" /> <job:distributed-listener class="xx.MyOnceSimpleJobListener" started-timeout-milliseconds="1000" completed-timeout-milliseconds="2000" /> </job:simple> <!-- 配置带做业数据库事件追踪的简单做业 --> <job:simple id="eventTraceElasticJob" class="xxx.MySimpleListenerElasticJob" registry-center-ref="regCenter" cron="0/10 * * * * ?" sharding-total-count="3" sharding-item-parameters="0=A,1=B,2=C" event-trace-rdb-data-source="yourDataSource"> </job:simple> </beans>
public class SmsNoticeTask implements SimpleJob { @Override public void execute(ShardingContext shardingContext) { logger.info("任务执行分片信息为:{}", shardingContext); //TODO do something }
做业一旦启动成功后不能修改JobName,若是修更名称则视为新的做业实例。
${namespaces}/${JobName} 下持久化config、leader、servers、instances 、sharding主节点。
SchedulerFacade: /** * 注册做业启动信息. * * @param enabled 做业是否启用 */ public void registerStartUpInfo(final boolean enabled) { listenerManager.startAllListeners(); leaderService.electLeader(); serverService.persistOnline(enabled); instanceService.persistOnline(); shardingService.setReshardingFlag(); monitorService.listen(); if (!reconcileService.isRunning()) { reconcileService.startAsync(); } }
ConfigurationService
持久化节点,保存任务的参数配置。若zk上已经持久化配置,且没有设置overwrite为true,以zk为准。
JobScheduler.init()
-> schedulerFacade.updateJobConfiguration(liteJobConfig)
-> configService.persist(liteJobConfig)
/** * 持久化分布式做业配置信息. * * @param liteJobConfig * 做业配置 */ public void persist(final LiteJobConfiguration liteJobConfig) { checkConflictJob(liteJobConfig);// 校验JobClass; 校验zk上若存在config节点但数据为null,删除Job整个节点 if (!jobNodeStorage.isJobNodeExisted(ConfigurationNode.ROOT) || liteJobConfig.isOverwrite()) { jobNodeStorage.replaceJobNode(ConfigurationNode.ROOT, LiteJobConfigurationGsonFactory.toJson(liteJobConfig)); } }
LeaderService
持久化 election、sharding、failover 子节点。
/** * 主节点路径. * */ public final class LeaderNode { /** * 主节点根路径. */ public static final String ROOT = "leader"; static final String ELECTION_ROOT = ROOT + "/election"; static final String INSTANCE = ELECTION_ROOT + "/instance"; static final String LATCH = ELECTION_ROOT + "/latch"; private final JobNodePath jobNodePath; .......... }
看成业初始化注册或原主做业实例离线时,触发选主过程。
LeaderElectionJobListener、 LeaderAbdicationJobListener
LeaderService: /** * 选举主节点. */ public void electLeader() { log.debug("Elect a new leader now."); jobNodeStorage.executeInLeader(LeaderNode.LATCH, new LeaderElectionExecutionCallback()); log.debug("Leader election completed."); } @RequiredArgsConstructor class LeaderElectionExecutionCallback implements LeaderExecutionCallback { @Override public void execute() { if (!hasLeader()) { jobNodeStorage.fillEphemeralJobNode(LeaderNode.INSTANCE, JobRegistry.getInstance().getJobInstance(jobName).getJobInstanceId()); } } } ----------------------------------------------------------------------------------------- JobNodeStorage: /** * 在主节点执行操做. * * @param latchNode 分布式锁使用的做业节点名称 * @param callback 执行操做的回调 */ public void executeInLeader(final String latchNode, final LeaderExecutionCallback callback) { try (LeaderLatch latch = new LeaderLatch(getClient(), jobNodePath.getFullPath(latchNode))) { latch.start(); latch.await(); callback.execute(); //CHECKSTYLE:OFF } catch (final Exception ex) { //CHECKSTYLE:ON handleException(ex); } }
LeaderLatch: void reset() throws Exception { setLeadership(false); setNode(null); BackgroundCallback callback = new BackgroundCallback() { @Override public void processResult(CuratorFramework client, CuratorEvent event) throws Exception { if ( debugResetWaitLatch != null ) { debugResetWaitLatch.await(); debugResetWaitLatch = null; } if ( event.getResultCode() == KeeperException.Code.OK.intValue() ) { setNode(event.getName()); if ( state.get() == State.CLOSED ) { setNode(null); } else { getChildren(); } } else { log.error("getChildren() failed. rc = " + event.getResultCode()); } } }; client.create().creatingParentContainersIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL) .inBackground(callback).forPath(ZKPaths.makePath(latchPath, LOCK_NAME), LeaderSelector.getIdBytes(id)); } public void await() throws InterruptedException, EOFException { synchronized(this) { while(this.state.get() == LeaderLatch.State.STARTED && !this.hasLeadership.get()) { this.wait(); } } if(this.state.get() != LeaderLatch.State.STARTED) { throw new EOFException(); } }
ShardingService
持久化necessary节点,当分片完成后将被删除。
用于做业启动、分片总数变动、做业服务器变更、或做业运行实例变更状况下设置分片标记。
ShardingListenerManager: class ShardingTotalCountChangedJobListener extends AbstractJobListener { @Override protected void dataChanged(final String path, final Type eventType, final String data) { if (configNode.isConfigPath(path) && 0 != JobRegistry.getInstance().getCurrentShardingTotalCount(jobName)) { int newShardingTotalCount = LiteJobConfigurationGsonFactory.fromJson(data).getTypeConfig() .getCoreConfig().getShardingTotalCount(); if (newShardingTotalCount != JobRegistry.getInstance().getCurrentShardingTotalCount(jobName)) { shardingService.setReshardingFlag(); JobRegistry.getInstance().setCurrentShardingTotalCount(jobName, newShardingTotalCount); } } } } class ListenServersChangedJobListener extends AbstractJobListener { @Override protected void dataChanged(final String path, final Type eventType, final String data) { if (!JobRegistry.getInstance().isShutdown(jobName) && (isInstanceChange(eventType, path) || isServerChange(path))) { shardingService.setReshardingFlag(); } }
FailoverListenerManager private boolean isFailoverEnabled() { LiteJobConfiguration jobConfig = configService.load(true); return null != jobConfig && jobConfig.isFailover(); } class JobCrashedJobListener extends AbstractJobListener { @Override protected void dataChanged(final String path, final Type eventType, final String data) { if (isFailoverEnabled() && Type.NODE_REMOVED == eventType && instanceNode.isInstancePath(path)) { String jobInstanceId = path.substring(instanceNode.getInstanceFullPath().length() + 1); if (jobInstanceId.equals(JobRegistry.getInstance().getJobInstance(jobName).getJobInstanceId())) { return; } List<Integer> failoverItems = failoverService.getFailoverItems(jobInstanceId); if (!failoverItems.isEmpty()) { for (int each : failoverItems) { failoverService.setCrashedFailoverFlag(each); failoverService.failoverIfNecessary(); } } else { for (int each : shardingService.getShardingItems(jobInstanceId)) { failoverService.setCrashedFailoverFlag(each); failoverService.failoverIfNecessary(); } } } } }
ServerService
看成业服务注册时,生成 服务器IP 持久化节点。因此按IP进行管理做业服务器。
InstanceService
看成业服务注册时,生成临时做业运行实例Id 临时节点。该节点名称规则:
eg: 192.168.42.1@-@6260
package com.dangdang.ddframe.job.lite.api.strategy; import com.dangdang.ddframe.job.util.env.IpUtils; import lombok.EqualsAndHashCode; import lombok.Getter; import lombok.RequiredArgsConstructor; import java.lang.management.ManagementFactory; /** * 做业运行实例. * * @author zhangliang */ @RequiredArgsConstructor @Getter @EqualsAndHashCode(of = "jobInstanceId") public final class JobInstance { private static final String DELIMITER = "@-@"; /** * 做业实例主键. */ private final String jobInstanceId; public JobInstance() { jobInstanceId = IpUtils.getIp() + DELIMITER + ManagementFactory.getRuntimeMXBean().getName().split("@")[0]; } /** * 获取做业服务器IP地址. * * @return 做业服务器IP地址 */ public String getIp() { return jobInstanceId.substring(0, jobInstanceId.indexOf(DELIMITER)); } }
服务初次注册、或服务实例发生变动、分片总数变动时促发分片。分片将在下次做业触发时执行,只有主节点能够分片,分片时的从节点都将阻塞。
ShardingService: /** * 若是须要分片且当前节点为主节点, 则做业分片. * * <p> * 若是当前无可用节点则不分片. * </p> */ public void shardingIfNecessary() { List<JobInstance> availableJobInstances = instanceService.getAvailableJobInstances(); if (!isNeedSharding() || availableJobInstances.isEmpty()) { return; } if (!leaderService.isLeaderUntilBlock()) { blockUntilShardingCompleted(); return; } waitingOtherJobCompleted(); LiteJobConfiguration liteJobConfig = configService.load(false); int shardingTotalCount = liteJobConfig.getTypeConfig().getCoreConfig().getShardingTotalCount(); log.debug("Job '{}' sharding begin.", jobName); jobNodeStorage.fillEphemeralJobNode(ShardingNode.PROCESSING, ""); resetShardingInfo(shardingTotalCount); JobShardingStrategy jobShardingStrategy = JobShardingStrategyFactory .getStrategy(liteJobConfig.getJobShardingStrategyClass()); jobNodeStorage.executeInTransaction(new PersistShardingInfoTransactionExecutionCallback( jobShardingStrategy.sharding(availableJobInstances, jobName, shardingTotalCount))); log.debug("Job '{}' sharding complete.", jobName); }