源码分析ElasticJob任务错过机制(misfire)与幂等性

任务在调度执行中,因为某种缘由未执行完毕,下一次调度任务触发后,在同一个Job实例中,会出现两个线程处理同一个分片上的数据,这样就会形成两个线程可能处理相同的数据,所以Elastic-Job引入幂等机制来解决上述问题。再重申一次ElastciJob的分布式是数据的分布式,一个任务在多个Job实例上运行,每一个Job实例处理该Job的部分数据(数据分片)。
本文重点分析ElasticJob是如何作到以下两点的。java

  1. ElasticJob如何确保在同一个Job实例中多个线程不会处理相同的数据数据库

  2. ElasticJob如何确保数据不会被多个Job实例处理微信

为了解决上述这种状况,ElasticJob引入任务错过补偿执行(misfire)与幂等机制。app

ElasticJob幂等原理

场景:例如任务调度周期为每5s执行一次,正常每次调度任务处理须要耗时2s,若是在某一段时间因为数据库压力变大,致使本来只须要2s就能处理完成的任务,如今须要16s才能运行,在一批数据处理未完成的状况下,每5s又会触发一次调度,若是不加以控制的话,在同一个实例上根据分片条件去查询数据库,查询到的数据有可能相同(部分相同),这样同一条任务数据将被屡次处理,若是业务方法未实现幂等,则会引起很是严重的问题,那ElasticJob是否能够避免这个问题呢?分布式

答案是确定。elasticJob提供了一个配置参数:monitorExecution=true,开启幂等性。post

一个任务触发后,将执行任务处理逻辑,其入口:ui

1AbstractElasticJobExecutor#misfireIfRunning
2if (jobFacade.misfireIfRunning(shardingContexts.getShardingItemParameters().keySet())) {  // @1
3       if (shardingContexts.isAllowSendJobEvent()) {  // @2
4             jobFacade.postJobStatusTraceEvent(shardingContexts.getTaskId(), State.TASK_FINISHED, String.format(
5                    "Previous job '%s' - shardingItems '%s' is still running, misfired job will start after previous job completed.", jobName, 
6                    shardingContexts.getShardingItemParameters().keySet()));
7       }
8      return;
9}

代码@1:在一个调度任务触发后若是上一次任务还未执行,则须要设置该分片状态为mirefire,表示错失了一次任务执行。
代码@2:若是该分片被设置为mirefire并开启了事件跟踪,将事件跟踪保存在数据库中。lua

接下来详细分析JobFacade.misfireIfRu-nning的实现逻辑:spa

 1/**
2     * 若是当前分片项仍在运行则设置任务被错过执行的标记.
3     * 
4     * @param items 须要设置错过执行的任务分片项
5     * @return 是否错过本次执行
6     */

7    public boolean misfireIfHasRunningItems(final Collection<Integer> items) {
8        if (!hasRunningItems(items)) {
9            return false;
10        }
11        setMisfire(items);
12        return true;
13    }

若是存在未完成的分片,则调用setMis-fire(items)方法,在开启monitorExecut-ion(true)的状况下,在分片任务开始时会建立{namespace}/jobname/sharding/{item}/running节点,在任务结束后会删除该目录,因此在判断是否有分片正在运行时,只需判断是否存在上述节点便可。若是存在,调用setMisfire方法。
.net

PS:ElasticJob只有在monitorExecuti-on=true的状况下,才会建立{namespa-ce}/jobname/sharding/{item}/running,m-isfire机制才能生效。

 1ExecutionService#setMisfire
2/**
3     * 设置任务被错过执行的标记.
4     *
5     * @param items 须要设置错过执行的任务分片项
6     */

7    public void setMisfire(final Collection<Integer> items{
8        for (int each : items) {
9            jobNodeStorage.createJobNodeIfNeeded(ShardingNode.getMisfireNode(each));
10        }
11    }

其实现方式为分配给该实例下的全部分片建立持久节点{namespace}/jobname/shading/{item}/misfire节点,注意,只要分配给该实例的任何一分片未执行完毕,则在该实例下的全部分片都增长m-isfire节点,而后忽略本次任务触发,等待任务结束后再执行。

1AbstractElasticJobExecutor#execute
2execute(shardingContexts, JobExecutionEvent.ExecutionSource.NORMAL_TRIGGER);
3     while (jobFacade.isExecuteMisfired(shardingContexts.getShardingItemParameters().keySet())) {
4         jobFacade.clearMisfire(shardingContexts.getShardingItemParameters().keySet());
5        execute(shardingContexts, JobExecutionEvent.ExecutionSource.MISFIRE);
6}

在任务执行完成后检查是否存在{name-space}/jobname/sharding/{item}/misfire节点,若是存在,则首先清除misfie相关的文件,而后执行任务。

幂等实现方案总结:
在下一个调度周期到达以后,只要发现这个分片的任何一个分片正在执行,则为该实例分片的全部分片都设置为mis-fire,等任务执行完毕后,再统一执行下一次任务调度。

ElasticJob数据分片

ElasticJob基于数据分片,不一样分片根据分片参数(人为配置),从数据库中查询各自数据(任务数据分片),若是当节点宕机,数据会从新分片,若是任务未执行完成,而后执行分片动做,数据是否会被不一样的任务同时处理呢?

答案是不会,由于当节点宕机后是否须要从新分片事件监听器会监听到Job实例表明的节点删除,设置从新分片,在任务被调度执行具体处理逻辑以前,须要从新分片,从新分片的前提又是要全部的分片的任务所有执行完毕,这也依赖是否开启幂等控制(monitorExecution)。

若是开启,ElasticJob能感知正在执行处理的分片,从新分片须要等待当前全部任务所有运行完毕后才会触发,故不会存在不一样节点处理相同数据的问题。

问答:

一、若是一个任务JOB的调度频率为每10s一次,在某个时间,该job执行耗时用了33s(平时只需执行5s),按照正常调度,应该后续会触发3次调度,那该job后执行完,会连续执行3次调度吗?

答案:在33s此次任务执行完成后,若是后面的任务执行在10s内执行完毕的话,只会触发一次,不会补偿3次,由于Ela-sticJob记录任务错失执行,只是建立了misfire节点,并不会记录错失的次数。


更多文章请关注微信公众号:


本文分享自微信公众号 - 中间件兴趣圈(dingwpmz_zjj)。
若有侵权,请联系 support@oschina.cn 删除。
本文参与“OSC源创计划”,欢迎正在阅读的你也加入,一块儿分享。

相关文章
相关标签/搜索