本文讨论均基于elasticjob的2.0.5-SNAPSHOT版本java
先从官网elastic-job-example-lite-java的demo开始分析,从com.dangdang.ddframe.job.example.JavaMain开始分析。linux
如今看来注册中心的实现只有zk的实现,对于zk的操做使用了Apache的curator,demo中使用了内嵌TestingServer
。git
DataSource
JobEventRdbConfiguration
暂且不是很清楚其做用,从类名能够大致猜出是把job的运行记录写入数据库中。github
很喜欢Builder模式啊 :)数据库
核心config包含建立出下述中不一样job类型的公有属性,是对config的更高一层抽象。根据其newBuilder
方法可知,有三个参数是必须的apache
属性名 | 类型 | 是否必须 | 说明 | 备注 |
---|---|---|---|---|
jobName | String | 是 | 做业名称 | 暂时不清楚是否须要惟一 |
cron | String | 是 | cron表达式 | |
shardingTotalCount | int | 是 | 做业分片总数 | |
shardingItemParameters | String | 否 | 自定义分片参数 | 后续详细介绍 |
jobParameter | String | 否 | job参数 | |
failover | boolean | 否 | 是否失败重试 | 之前看到的是利用zk实现? |
misfire | boolean | 否 | 是否哑火重试 | 利用quartz实现? |
description | String | 否 | 描述 | |
jobProperties | JobProperties | 否 | job额外的参数一个Map |
从上图可知要建立出不一样类型的job,先要建立不一样类型的jobconfig,这里先以SimpleJob
为例,建立SimpleJobConfiguration
须要两个参数,一个是上述中的JobCoreConfiguration
;另外一个是你写的业务逻辑的job实现类的权限定类名,由于这里以SimpleJob
为例,因此这个类必须继承SimpleJob
,类继承图以下:json
SimpleJobConfiguration
属性以下:服务器
属性名 | 类型 | 是否必须 | 说明 | 备注 |
---|---|---|---|---|
coreConfig | JobCoreConfiguration | 是 | 核心属性 | |
jobType | JobType | 是 | job类型 | 以Simple为例则为JobType.SIMPLE |
jobClass | String | 是 | job的全限定类名 |
建立JobScheduler
须要注册中心CoordinatorRegistryCenter
,任务相关配置LiteJobConfiguration
,事件配置相关JobEventConfiguration
,job监听器ElasticJobListener
。多线程
说说其中的LiteJobConfiguration
,合分合,这样作多是语义和逻辑上的清晰,其属性具体意义可参考其Builder中的注释。异步
建立过程过于复杂,不得不贴代码分析了。
private JobScheduler(...) { jobName = liteJobConfig.getJobName(); //1 jobExecutor = new JobExecutor(regCenter, liteJobConfig, elasticJobListeners); //2 jobFacade = new LiteJobFacade(regCenter, jobName, Arrays.asList(elasticJobListeners), jobEventBus); //3 jobRegistry = JobRegistry.getInstance(); }
建立了做业执行器JobExecutor
,内部服务门面类LiteJobFacade
以及做业注册表JobRegistry
。
public JobExecutor(...) { //... //1 setGuaranteeServiceForElasticJobListeners(regCenter, elasticJobListenerList); //2 schedulerFacade = new SchedulerFacade(regCenter, liteJobConfig.getJobName(), elasticJobListenerList); }
1处给全部实现AbstractDistributeOnceElasticJobListener
的类添加GuaranteeService
,每次执行先后** 全部 **分片项都注册到zk上,若是都注册成功,才执行实现类里面的doBeforeJobExecutedAtLastStarted
方法或者doAfterJobExecutedAtLastCompleted
,而后清除zk上刚才注册的节点。
2处建立为调度器提供内部服务的门面类SchedulerFacade
SchedulerFacade
建立public SchedulerFacade(...) { //1 configService = new ConfigurationService(regCenter, jobName); //2 leaderElectionService = new LeaderElectionService(regCenter, jobName); //3 serverService = new ServerService(regCenter, jobName); //4 shardingService = new ShardingService(regCenter, jobName); //5 executionService = new ExecutionService(regCenter, jobName); //6 monitorService = new MonitorService(regCenter, jobName); //7 listenerManager = new ListenerManager(regCenter, jobName, elasticJobListeners); }
建立了各类服务类,这些服务大部分都在zk上有对应的节点信息。zk节点详细解释参考官网说明
config
,值为json格式的LiteJobConfiguration
leader
,用于选择到底哪台服务器执行servers
,先暂时用ip区分,记录服务hostname,状态及分片信息。execution
,根据分片号区分,存储具体的分片执行状况。 此处可看出misfire还要借助zkecho dump|nc ip port
LiteJobFacade
public LiteJobFacade(...) { configService = new ConfigurationService(regCenter, jobName); shardingService = new ShardingService(regCenter, jobName); serverService = new ServerService(regCenter, jobName); executionContextService = new ExecutionContextService(regCenter, jobName); executionService = new ExecutionService(regCenter, jobName); failoverService = new FailoverService(regCenter, jobName); this.elasticJobListeners = elasticJobListeners; this.jobEventBus = jobEventBus; }
这也是一个门面类,相对于上面的SchedulerFacade
,多了几个服务
FailoverService
失败重试服务,在zk的根节点为failover
,一旦该节点出现,相应的节点监听器就会执行。JobEventBus
异步的事件总线,如今来看只注册了记录任务执行踪影的写数据库事件JobRegistry
做业注册表用map将jobName为key,做业调度控制器JobScheduleController
为value存于内存中,后期还能够再用key取出控制器进行job的触发、暂停等操做。可见这只是针对于某台机器的,而对于分布式job是有限制的,而且上述中提出的jobName是否可重复,在此处能够推断出设计上是要保证惟一性的。
public void init() { jobExecutor.init(); JobTypeConfiguration jobTypeConfig = jobExecutor.getSchedulerFacade().loadJobConfiguration().getTypeConfig(); JobScheduleController jobScheduleController = new JobScheduleController( createScheduler(jobTypeConfig.getCoreConfig().isMisfire()), createJobDetail(jobTypeConfig.getJobClass()), jobExecutor.getSchedulerFacade(), jobName); jobScheduleController.scheduleJob(jobTypeConfig.getCoreConfig().getCron()); jobRegistry.addJobScheduleController(jobName, jobScheduleController); }
public void init() { schedulerFacade.clearPreviousServerStatus(); regCenter.addCacheData("/" + liteJobConfig.getJobName()); schedulerFacade.registerStartUpInfo(liteJobConfig); }
/** * 注册Elastic-Job启动信息. * * @param liteJobConfig 做业配置 */ public void registerStartUpInfo(final LiteJobConfiguration liteJobConfig) { listenerManager.startAllListeners(); leaderElectionService.leaderForceElection(); configService.persist(liteJobConfig); serverService.persistServerOnline(!liteJobConfig.isDisabled()); serverService.clearJobPausedStatus(); shardingService.setReshardingFlag(); monitorService.listen(); listenerManager.setCurrentShardingTotalCount(configService.load(false).getTypeConfig().getCoreConfig().getShardingTotalCount()); }
LeaderElectionJobListener
->leaderElectionService
leader/host=ipShardingTotalCountChangedJobListener
和servers节点变化ListenServersChangedJobListener
JobCrashedJobListener
必须知足变化的节点以running结尾,而且是删除事件&&execution/分片号/completed节点存在,配置信息failover为true;FailoverJobCrashedJobListener
必须知足变化的节点以failover结尾,而且是删除事件&&execution/分片号/completed节点存在,配置信息failover为true;FailoverSettingsChangedJobListener触发条件config变化ListenerManager
的总分片数。JobScheduleController
public void init() { //... JobScheduleController jobScheduleController = new JobScheduleController( createScheduler(jobTypeConfig.getCoreConfig().isMisfire()), createJobDetail(jobTypeConfig.getJobClass()), jobExecutor.getSchedulerFacade(), jobName); //... }
Scheduler
private Scheduler createScheduler(final boolean isMisfire) { Scheduler result; try { StdSchedulerFactory factory = new StdSchedulerFactory(); factory.initialize(getBaseQuartzProperties(isMisfire)); result = factory.getScheduler(); result.getListenerManager().addTriggerListener(jobExecutor.getSchedulerFacade().newJobTriggerListener()); } catch (final SchedulerException ex) { throw new JobSystemException(ex); } return result; }
从这里能够看出misfire的具体流程是quartz监听到某任务发生misfire,触发监听器JobTriggerListener
的triggerMisfired方法,而后将misfire节点注册到zk上,zk的TreeCach监听到节点变化作出相应处理。
JobDetail
private JobDetail createJobDetail(final String jobClass) { JobDetail result = JobBuilder.newJob(LiteJob.class).withIdentity(jobName).build(); result.getJobDataMap().put(JOB_FACADE_DATA_MAP_KEY, jobFacade); //这个地方有什么用???? Optional<ElasticJob> elasticJobInstance = createElasticJobInstance(); if (elasticJobInstance.isPresent()) { result.getJobDataMap().put(ELASTIC_JOB_DATA_MAP_KEY, elasticJobInstance.get()); } else if (!jobClass.equals(ScriptJob.class.getCanonicalName())) { try { result.getJobDataMap().put(ELASTIC_JOB_DATA_MAP_KEY, Class.forName(jobClass).newInstance()); } catch (final ReflectiveOperationException ex) { throw new JobConfigurationException("Elastic-Job: Job class '%s' can not initialize.", jobClass); } } return result; }
LiteJob
是继承自quartz的Job类,实现了execute方法,有两个属性一个ElasticJob
是你本身实现的业务逻辑的类,另外一个是JobFacade
门面类。execute方法其实调用了executor的execute方法,下面是executor的继承类。
分片等处理都已经在抽象类中实现好了,须要实现的是 protected abstract void process(ShardingContext shardingContext);
对于每一个分片是多线程调用,代码以下:
for (final int each : items) { final JobExecutionEvent jobExecutionEvent = new JobExecutionEvent(shardingContexts.getTaskId(), jobName, executionSource, each); if (executorService.isShutdown()) { return; } executorService.submit(new Runnable() { @Override public void run() { try { process(shardingContexts, each, jobExecutionEvent); } finally { latch.countDown(); } } }); } try { latch.await(); } catch (final InterruptedException ex) { Thread.currentThread().interrupt(); }
public void init() { //... jobScheduleController.scheduleJob(jobTypeConfig.getCoreConfig().getCron()); jobRegistry.addJobScheduleController(jobName, jobScheduleController); }
根据cron表达式开始启动任务调度,而且把控制器放入注册表中。