elasticjob-2.0.5-SNAPSHOT源码分析

本文讨论均基于elasticjob的2.0.5-SNAPSHOT版本java

先从官网elastic-job-example-lite-java的demo开始分析,从com.dangdang.ddframe.job.example.JavaMain开始分析。linux

根据配置建立注册中心

注册中心uml

如今看来注册中心的实现只有zk的实现,对于zk的操做使用了Apache的curator,demo中使用了内嵌TestingServergit

建立事件跟踪写库config

  1. 定义了一个数据源DataSource
  2. 建立一个时间configJobEventRdbConfiguration

暂且不是很清楚其做用,从类名能够大致猜出是把job的运行记录写入数据库中。github

事件config

建立核心config

很喜欢Builder模式啊 :)数据库

核心config包含建立出下述中不一样job类型的公有属性,是对config的更高一层抽象。根据其newBuilder方法可知,有三个参数是必须的apache

属性名 类型 是否必须 说明 备注
jobName String 做业名称 暂时不清楚是否须要惟一
cron String cron表达式
shardingTotalCount int 做业分片总数
shardingItemParameters String 自定义分片参数 后续详细介绍
jobParameter String job参数
failover boolean 是否失败重试 之前看到的是利用zk实现?
misfire boolean 是否哑火重试 利用quartz实现?
description String 描述
jobProperties JobProperties job额外的参数一个Map

建立不一样job类型的config

jobtype

从上图可知要建立出不一样类型的job,先要建立不一样类型的jobconfig,这里先以SimpleJob为例,建立SimpleJobConfiguration须要两个参数,一个是上述中的JobCoreConfiguration;另外一个是你写的业务逻辑的job实现类的权限定类名,由于这里以SimpleJob为例,因此这个类必须继承SimpleJob,类继承图以下:json

jobextends

SimpleJobConfiguration属性以下:服务器

属性名 类型 是否必须 说明 备注
coreConfig JobCoreConfiguration 核心属性
jobType JobType job类型 以Simple为例则为JobType.SIMPLE
jobClass String job的全限定类名

建立任务调度JobScheduler

建立JobScheduler须要注册中心CoordinatorRegistryCenter,任务相关配置LiteJobConfiguration,事件配置相关JobEventConfiguration,job监听器ElasticJobListener多线程

说说其中的LiteJobConfiguration,合分合,这样作多是语义和逻辑上的清晰,其属性具体意义可参考其Builder中的注释。异步

建立过程过于复杂,不得不贴代码分析了。

private JobScheduler(...) {
        jobName = liteJobConfig.getJobName();
        //1
        jobExecutor = new JobExecutor(regCenter, liteJobConfig, elasticJobListeners);
        //2
        jobFacade = new LiteJobFacade(regCenter, jobName, Arrays.asList(elasticJobListeners), jobEventBus);
        //3
        jobRegistry = JobRegistry.getInstance();
    }

建立了做业执行器JobExecutor,内部服务门面类LiteJobFacade以及做业注册表JobRegistry

一、JobExecutor`做业启动器

public JobExecutor(...) {
		//...
        //1
        setGuaranteeServiceForElasticJobListeners(regCenter, elasticJobListenerList);
        //2
        schedulerFacade = new SchedulerFacade(regCenter, liteJobConfig.getJobName(), elasticJobListenerList);
    }

1处给全部实现AbstractDistributeOnceElasticJobListener的类添加GuaranteeService,每次执行先后** 全部 **分片项都注册到zk上,若是都注册成功,才执行实现类里面的doBeforeJobExecutedAtLastStarted方法或者doAfterJobExecutedAtLastCompleted,而后清除zk上刚才注册的节点。

2处建立为调度器提供内部服务的门面类SchedulerFacade

1.一、SchedulerFacade建立

public SchedulerFacade(...) {
		//1
        configService = new ConfigurationService(regCenter, jobName);
        //2
        leaderElectionService = new LeaderElectionService(regCenter, jobName);
        //3
        serverService = new ServerService(regCenter, jobName);
        //4
        shardingService = new ShardingService(regCenter, jobName);
        //5
        executionService = new ExecutionService(regCenter, jobName);
        //6
        monitorService = new MonitorService(regCenter, jobName);
        //7
        listenerManager = new ListenerManager(regCenter, jobName, elasticJobListeners);
    }

建立了各类服务类,这些服务大部分都在zk上有对应的节点信息。zk节点详细解释参考官网说明

  1. 配置服务,在zk上有具体的节点config,值为json格式的LiteJobConfiguration
  2. 选主服务,在zk上有具体的节点leader,用于选择到底哪台服务器执行
  3. 服务器信息服务,在zk上有具体节点servers,先暂时用ip区分,记录服务hostname,状态及分片信息。
  4. 分片服务
  5. 执行服务,节点execution,根据分片号区分,存储具体的分片执行状况。 此处可看出misfire还要借助zk
  6. 监控服务,外界能够开一个socket链接,而后输入dump获取zk的全部注册节点,linux下命令echo dump|nc ip port
  7. 监听管理,监听zk节点变化,好比选主、failover、分片等。** 这里面的代码也不少,到执行的时候应该还有,因此先不看了 **

1.二、LiteJobFacade

public LiteJobFacade(...) {
        configService = new ConfigurationService(regCenter, jobName);
        shardingService = new ShardingService(regCenter, jobName);
        serverService = new ServerService(regCenter, jobName);
        executionContextService = new ExecutionContextService(regCenter, jobName);
        executionService = new ExecutionService(regCenter, jobName);
        failoverService = new FailoverService(regCenter, jobName);
        this.elasticJobListeners = elasticJobListeners;
        this.jobEventBus = jobEventBus;
    }

这也是一个门面类,相对于上面的SchedulerFacade,多了几个服务

  1. FailoverService失败重试服务,在zk的根节点为failover,一旦该节点出现,相应的节点监听器就会执行。
  2. JobEventBus异步的事件总线,如今来看只注册了记录任务执行踪影的写数据库事件

1.三、JobRegistry做业注册表

用map将jobName为key,做业调度控制器JobScheduleController为value存于内存中,后期还能够再用key取出控制器进行job的触发、暂停等操做。可见这只是针对于某台机器的,而对于分布式job是有限制的,而且上述中提出的jobName是否可重复,在此处能够推断出设计上是要保证惟一性的

执行JobScheduler#init()方法,执行任务调度

public void init() {
        jobExecutor.init();
        JobTypeConfiguration jobTypeConfig = jobExecutor.getSchedulerFacade().loadJobConfiguration().getTypeConfig();
        JobScheduleController jobScheduleController = new JobScheduleController(
                createScheduler(jobTypeConfig.getCoreConfig().isMisfire()), createJobDetail(jobTypeConfig.getJobClass()), jobExecutor.getSchedulerFacade(), jobName);
        jobScheduleController.scheduleJob(jobTypeConfig.getCoreConfig().getCron());
        jobRegistry.addJobScheduleController(jobName, jobScheduleController);
    }

一、jobExecutor初始化

public void init() {
        schedulerFacade.clearPreviousServerStatus();
        regCenter.addCacheData("/" + liteJobConfig.getJobName());
        schedulerFacade.registerStartUpInfo(liteJobConfig);
    }
  1. 清除servers/ip/status和servers/ip/shutdown
  2. 生成以/jobName为根节点的TreeCache
  3. 注册启动信息

1.一、注册启动信息

/**
     * 注册Elastic-Job启动信息.
     * 
     * @param liteJobConfig 做业配置
     */
    public void registerStartUpInfo(final LiteJobConfiguration liteJobConfig) {
        listenerManager.startAllListeners();
        leaderElectionService.leaderForceElection();
        configService.persist(liteJobConfig);
        serverService.persistServerOnline(!liteJobConfig.isDisabled());
        serverService.clearJobPausedStatus();
        shardingService.setReshardingFlag();
        monitorService.listen();
        listenerManager.setCurrentShardingTotalCount(configService.load(false).getTypeConfig().getCoreConfig().getShardingTotalCount());
    }
  1. 开启监听,只要根节点下的节点发生变化就会触发对应的监听
  • 选主监听:LeaderElectionJobListener->leaderElectionService leader/host=ip
  • 分片监听:总分片ShardingTotalCountChangedJobListener和servers节点变化ListenServersChangedJobListener
  • 执行监听:config节点变化监听`MonitorExecutionChangedJobListener
  • 失败重试监听:JobCrashedJobListener必须知足变化的节点以running结尾,而且是删除事件&&execution/分片号/completed节点存在,配置信息failover为true;FailoverJobCrashedJobListener必须知足变化的节点以failover结尾,而且是删除事件&&execution/分片号/completed节点存在,配置信息failover为true;FailoverSettingsChangedJobListener触发条件config变化
  • jobOperationListenerManager.start();
  • configurationListenerManager.start();
  • guaranteeListenerManager.start(); 太多了,本身看代码吧。 :(
  1. 选主服务强制选主,建立一个临时节点leader/host=ip
  2. config节点注册,若是config的overwirte为true会强制重置config节点的信息
  3. 注册执行机器上线节点,能够经过控制disabled属性来动态控制某台执行机器的上下线,另外还注册了一个临时节点servers/ip/status来表示机器的状态
  4. 清除job的pause标记
  5. 设置resharding标志
  6. 若是config中的monitorPort大于0,则开启一个ServerSocket
  7. 设置ListenerManager的总分片数。

一、建立JobScheduleController

public void init() {
        //...
        JobScheduleController jobScheduleController = new JobScheduleController(
                createScheduler(jobTypeConfig.getCoreConfig().isMisfire()), createJobDetail(jobTypeConfig.getJobClass()), jobExecutor.getSchedulerFacade(), jobName);
       //...
    }

1.一、先建立quartz的Scheduler

private Scheduler createScheduler(final boolean isMisfire) {
        Scheduler result;
        try {
            StdSchedulerFactory factory = new StdSchedulerFactory();
            factory.initialize(getBaseQuartzProperties(isMisfire));
            result = factory.getScheduler();
            result.getListenerManager().addTriggerListener(jobExecutor.getSchedulerFacade().newJobTriggerListener());
        } catch (final SchedulerException ex) {
            throw new JobSystemException(ex);
        }
        return result;
    }

从这里能够看出misfire的具体流程是quartz监听到某任务发生misfire,触发监听器JobTriggerListener的triggerMisfired方法,而后将misfire节点注册到zk上,zk的TreeCach监听到节点变化作出相应处理。

1.三、建立quartz的JobDetail

private JobDetail createJobDetail(final String jobClass) {
        JobDetail result = JobBuilder.newJob(LiteJob.class).withIdentity(jobName).build();
        result.getJobDataMap().put(JOB_FACADE_DATA_MAP_KEY, jobFacade);
        //这个地方有什么用????
        Optional<ElasticJob> elasticJobInstance = createElasticJobInstance();
        if (elasticJobInstance.isPresent()) {
            result.getJobDataMap().put(ELASTIC_JOB_DATA_MAP_KEY, elasticJobInstance.get());
        } else if (!jobClass.equals(ScriptJob.class.getCanonicalName())) {
            try {
                result.getJobDataMap().put(ELASTIC_JOB_DATA_MAP_KEY, Class.forName(jobClass).newInstance());
            } catch (final ReflectiveOperationException ex) {
                throw new JobConfigurationException("Elastic-Job: Job class '%s' can not initialize.", jobClass);
            }
        }
        return result;
    }

LiteJob是继承自quartz的Job类,实现了execute方法,有两个属性一个ElasticJob是你本身实现的业务逻辑的类,另外一个是JobFacade门面类。execute方法其实调用了executor的execute方法,下面是executor的继承类。

executor.png

分片等处理都已经在抽象类中实现好了,须要实现的是 protected abstract void process(ShardingContext shardingContext);

对于每一个分片是多线程调用,代码以下:

for (final int each : items) {
            final JobExecutionEvent jobExecutionEvent = new JobExecutionEvent(shardingContexts.getTaskId(), jobName, executionSource, each);
            if (executorService.isShutdown()) {
                return;
            }
            executorService.submit(new Runnable() {
                @Override
                public void run() {
                    try {
                        process(shardingContexts, each, jobExecutionEvent);
                    } finally {
                        latch.countDown();
                    }
                }
            });
        }
        try {
            latch.await();
        } catch (final InterruptedException ex) {
            Thread.currentThread().interrupt();
        }

1.四、任务调度执行

public void init() {
      	//...
        jobScheduleController.scheduleJob(jobTypeConfig.getCoreConfig().getCron());
        jobRegistry.addJobScheduleController(jobName, jobScheduleController);
    }

根据cron表达式开始启动任务调度,而且把控制器放入注册表中。

相关文章
相关标签/搜索