Elastic-job-lite 官方概述
Quartz-scheduler java
Job的执行器: LiteJob中execute方法实例化AbstractElasticJobExecutor。git
/** * 简单分布式做业接口. * */ public interface SimpleJob extends ElasticJob { /** * 执行做业. * * @param shardingContext 分片上下文 */ void execute(ShardingContext shardingContext); }
/** * 数据流分布式做业接口. * * * @param <T> 数据类型 */ public interface DataflowJob<T> extends ElasticJob { /** * 获取待处理数据. * * @param shardingContext 分片上下文 * @return 待处理的数据集合 */ List<T> fetchData(ShardingContext shardingContext); /** * 处理数据. * * @param shardingContext 分片上下文 * @param data 待处理数据集合 */ void processData(ShardingContext shardingContext, List<T> data); }
DataflowJobExecutor: @Override protected void process(final ShardingContext shardingContext) { DataflowJobConfiguration dataflowConfig = (DataflowJobConfiguration) getJobRootConfig().getTypeConfig(); if (dataflowConfig.isStreamingProcess()) { streamingExecute(shardingContext); } else { oneOffExecute(shardingContext); } } private void streamingExecute(final ShardingContext shardingContext) { List<Object> data = fetchData(shardingContext); while (null != data && !data.isEmpty()) { processData(shardingContext, data); if (!getJobFacade().isEligibleForJobRunning()) { break; } data = fetchData(shardingContext); } } private void oneOffExecute(final ShardingContext shardingContext) { List<Object> data = fetchData(shardingContext); if (null != data && !data.isEmpty()) { processData(shardingContext, data); } }
namespace必定不能反斜杠开头:
初始化curatorFramework实例NamespaceImpl对象时:PathUtils.validatePath("/" + namespace)。github
java.lang.IllegalArgumentException: Invalid namespace: /zookeeper/scheduler/namespace/local at org.apache.curator.framework.imps.NamespaceImpl.<init>(NamespaceImpl.java:48) at org.apache.curator.framework.imps.CuratorFrameworkImpl.<init>(CuratorFrameworkImpl.java:116) at org.apache.curator.framework.CuratorFrameworkFactory$Builder.build(CuratorFrameworkFactory.java:145) at com.dangdang.ddframe.job.reg.zookeeper.ZookeeperRegistryCenter.init(ZookeeperRegistryCenter.java:97)
上图中显示的<init>标识的是class文件中表明对象的构造方法,在类实例化时调用。apache
public void init() { LiteJobConfiguration liteJobConfigFromRegCenter = schedulerFacade.updateJobConfiguration(liteJobConfig); JobRegistry.getInstance().setCurrentShardingTotalCount(liteJobConfigFromRegCenter.getJobName(), liteJobConfigFromRegCenter.getTypeConfig().getCoreConfig().getShardingTotalCount()); JobScheduleController jobScheduleController = new JobScheduleController(createScheduler(), createJobDetail(liteJobConfigFromRegCenter.getTypeConfig().getJobClass()), liteJobConfigFromRegCenter.getJobName()); JobRegistry.getInstance().registerJob(liteJobConfigFromRegCenter.getJobName(), jobScheduleController, regCenter); schedulerFacade.registerStartUpInfo(!liteJobConfigFromRegCenter.isDisabled()); jobScheduleController.scheduleJob(liteJobConfigFromRegCenter.getTypeConfig().getCoreConfig().getCron()); }
使用JobTypeConfiguration,JobTypeConfiguration使用JobCoreConfiguration,层层嵌套。设置jobName、cron、shardingTotalCount、shardingItemParameters、failover、misfire、jobClass、monitorExecution等做业属性。服务器
JobScheduler.init() -> schedulerFacade.registerStartUpInfo(liteJobConfig) 中注册config节点时,经过overwrite属性configService.persist(liteJobConfig)断定是否须要覆盖zk上的配置。若是设置overwrite为false,将从zookeeper上获取配置数据。框架
ConfigurationService: public void persist(final LiteJobConfiguration liteJobConfig) { checkConflictJob(liteJobConfig); if (!jobNodeStorage.isJobNodeExisted(ConfigurationNode.ROOT) || liteJobConfig.isOverwrite()) { jobNodeStorage.replaceJobNode(ConfigurationNode.ROOT, LiteJobConfigurationGsonFactory.toJson(liteJobConfig)); } }
启动任务初始化zk节点信息,开启zk节点事件监控; 开启检测分布式做业服务分片含离线做业实例;
终止调度时,删除leader/election/instance临时节点,关闭监控ServerSocket,关闭检测一致性服务。分布式
/** * 注册做业启动信息. * * @param enabled 做业是否启用 */ public void registerStartUpInfo(final boolean enabled) { listenerManager.startAllListeners(); leaderService.electLeader(); serverService.persistOnline(enabled); instanceService.persistOnline(); shardingService.setReshardingFlag(); monitorService.listen(); if (!reconcileService.isRunning()) { reconcileService.startAsync(); } }
在JobRunShell初始化时经过quartz运行原理PropertySettingJobFactory.setBeanProps方法将JobDetail.getJobDataMap()被反射注入到LiteJob中。ide
在任务运行中提供对节点信息获取或更新的服务。post
JobDetail中指定任务的执行类:quartz.Job。将成员变量的值按名称存在JobDetail中的JobDataMap中。经过SimpleJobFactory构建,PropertySettingJobFactory设置成员属性值。fetch
private JobDetail createJobDetail(final String jobClass) { JobDetail result = JobBuilder.newJob(LiteJob.class).withIdentity(liteJobConfig.getJobName()).build(); result.getJobDataMap().put(JOB_FACADE_DATA_MAP_KEY, jobFacade); Optional<ElasticJob> elasticJobInstance = createElasticJobInstance(); if (elasticJobInstance.isPresent()) { result.getJobDataMap().put(ELASTIC_JOB_DATA_MAP_KEY, elasticJobInstance.get()); } else if (!jobClass.equals(ScriptJob.class.getCanonicalName())) { try { result.getJobDataMap().put(ELASTIC_JOB_DATA_MAP_KEY, Class.forName(jobClass).newInstance()); } catch (final ReflectiveOperationException ex) { throw new JobConfigurationException("Elastic-Job: Job class '%s' can not initialize.", jobClass); } } return result; }
保存scheduler、jobDetail、JobName间的关系。控制做业调度启动、从新触发、关闭调度能控制操做。
package com.dangdang.ddframe.job.lite.internal.schedule; import com.dangdang.ddframe.job.exception.JobSystemException; import lombok.RequiredArgsConstructor; import org.quartz.CronScheduleBuilder; import org.quartz.CronTrigger; import org.quartz.JobDetail; import org.quartz.Scheduler; import org.quartz.SchedulerException; import org.quartz.TriggerBuilder; import org.quartz.TriggerKey; /** * 做业调度控制器. * * @author zhangliang */ @RequiredArgsConstructor public final class JobScheduleController { private final Scheduler scheduler; // quartzScheduler private final JobDetail jobDetail; // LiteJob private final String triggerIdentity; // JobName /** * 调度做业. * * @param cron CRON表达式 */ public void scheduleJob(final String cron) { try { // RAMJobStore保存了Key的唯一信息 if (!scheduler.checkExists(jobDetail.getKey())) { scheduler.scheduleJob(jobDetail, createTrigger(cron)); } scheduler.start(); } catch (final SchedulerException ex) { throw new JobSystemException(ex); } } /** * 从新调度做业. * * @param cron CRON表达式 */ public void rescheduleJob(final String cron) { try { CronTrigger trigger = (CronTrigger) scheduler.getTrigger(TriggerKey.triggerKey(triggerIdentity)); if (!scheduler.isShutdown() && null != trigger && !cron.equals(trigger.getCronExpression())) { scheduler.rescheduleJob(TriggerKey.triggerKey(triggerIdentity), createTrigger(cron)); } } catch (final SchedulerException ex) { throw new JobSystemException(ex); } } private CronTrigger createTrigger(final String cron) { return TriggerBuilder.newTrigger().withIdentity(triggerIdentity).withSchedule(CronScheduleBuilder.cronSchedule(cron).withMisfireHandlingInstructionDoNothing()).build(); } /** * 暂停做业. */ public void pauseJob() { try { if (!scheduler.isShutdown()) { scheduler.pauseAll(); } } catch (final SchedulerException ex) { throw new JobSystemException(ex); } } /** * 恢复做业. */ public void resumeJob() { try { if (!scheduler.isShutdown()) { scheduler.resumeAll(); } } catch (final SchedulerException ex) { throw new JobSystemException(ex); } } /** * 马上启动做业. */ public void triggerJob() { try { if (!scheduler.isShutdown()) { scheduler.triggerJob(jobDetail.getKey()); } } catch (final SchedulerException ex) { throw new JobSystemException(ex); } } /** * 关闭调度器. */ public void shutdown() { try { if (!scheduler.isShutdown()) { scheduler.shutdown(); } } catch (final SchedulerException ex) { throw new JobSystemException(ex); } } }
init:
running:
org.quartz.core.QuartzSchedulerThread:
经过Object.wait(long timeout)的方式阻塞执行,循环建立org.quartz.core.JobRunShell对象并初始化JobExecutionContextImpl的过程。JobExecutionContextImpl对象中包含经过JobFactory建立出来的org.quartz.Job实例对象LitJob。经过JobDetail.getJobDataMap()反射注入的elasticJob和JobFacade调用JobExecutorFactory建立AbstractElasticJobExecutor实例,执行execute()。
AbstractElasticJobExecutor: /** * 执行做业. */ public final void execute() { try { jobFacade.checkJobExecutionEnvironment(); } catch (final JobExecutionEnvironmentException cause) { jobExceptionHandler.handleException(jobName, cause); } ShardingContexts shardingContexts = jobFacade.getShardingContexts(); if (shardingContexts.isAllowSendJobEvent()) { jobFacade.postJobStatusTraceEvent(shardingContexts.getTaskId(), State.TASK_STAGING, String.format("Job '%s' execute begin.", jobName)); } if (jobFacade.misfireIfRunning(shardingContexts.getShardingItemParameters().keySet())) { if (shardingContexts.isAllowSendJobEvent()) { jobFacade.postJobStatusTraceEvent(shardingContexts.getTaskId(), State.TASK_FINISHED, String.format( "Previous job '%s' - shardingItems '%s' is still running, misfired job will start after previous job completed.", jobName, shardingContexts.getShardingItemParameters().keySet())); } return; } try { jobFacade.beforeJobExecuted(shardingContexts); //CHECKSTYLE:OFF } catch (final Throwable cause) { //CHECKSTYLE:ON jobExceptionHandler.handleException(jobName, cause); } execute(shardingContexts, JobExecutionEvent.ExecutionSource.NORMAL_TRIGGER); while (jobFacade.isExecuteMisfired(shardingContexts.getShardingItemParameters().keySet())) { jobFacade.clearMisfire(shardingContexts.getShardingItemParameters().keySet()); execute(shardingContexts, JobExecutionEvent.ExecutionSource.MISFIRE); } jobFacade.failoverIfNecessary(); try { jobFacade.afterJobExecuted(shardingContexts); //CHECKSTYLE:OFF } catch (final Throwable cause) { //CHECKSTYLE:ON jobExceptionHandler.handleException(jobName, cause); } }
配置信息保存在zk的${namespaces}/${jobUniqueId}/config节点上。
当config节点数据变更时,触发CronSettingAndJobEventChangedJobListener事件响应,并从新生成org.quartz.impl.StdScheduler的CronTrigger。调用scheduler.rescheduleJob方法。
public class QuartzScheduler implements RemotableQuartzScheduler: public Date rescheduleJob(TriggerKey triggerKey, Trigger newTrigger) throws SchedulerException { validateState(); if (triggerKey == null) { throw new IllegalArgumentException("triggerKey cannot be null"); } if (newTrigger == null) { throw new IllegalArgumentException("newTrigger cannot be null"); } OperableTrigger trig = (OperableTrigger)newTrigger; Trigger oldTrigger = getTrigger(triggerKey); if (oldTrigger == null) { return null; } else { trig.setJobKey(oldTrigger.getJobKey()); } trig.validate(); Calendar cal = null; if (newTrigger.getCalendarName() != null) { cal = resources.getJobStore().retrieveCalendar( newTrigger.getCalendarName()); } Date ft = trig.computeFirstFireTime(cal); if (ft == null) { throw new SchedulerException( "Based on configured schedule, the given trigger will never fire."); } if (resources.getJobStore().replaceTrigger(triggerKey, trig)) { notifySchedulerThread(newTrigger.getNextFireTime().getTime()); notifySchedulerListenersUnscheduled(triggerKey); notifySchedulerListenersSchduled(newTrigger); } else { return null; } return ft; }
TriggerListenerManager管理JobTriggerStatusJobListener来响应手动触发任务的执行。
class JobTriggerStatusJobListener extends AbstractJobListener { @Override protected void dataChanged(final String path, final Type eventType, final String data) { //断定事件类型、触发类型、触发的执行实例是不是本机服务的实例 if (!InstanceOperation.TRIGGER.name().equals(data) || !instanceNode.isLocalInstancePath(path) || Type.NODE_UPDATED != eventType) { return; } instanceService.clearTriggerFlag(); if (!JobRegistry.getInstance().isShutdown(jobName) && !JobRegistry.getInstance().isJobRunning(jobName)) { // TODO 目前是做业运行时不能触发, 将来改成堆积式触发 JobRegistry.getInstance().getJobScheduleController(jobName).triggerJob(); } } }
在AbstractElasticJobExecutor.execute()中
JobFacade /** * 做业执行前的执行的方法. * * @param shardingContexts 分片上下文 */ void beforeJobExecuted(ShardingContexts shardingContexts); /** * 做业执行后的执行的方法. * * @param shardingContexts 分片上下文 */ void afterJobExecuted(ShardingContexts shardingContexts); /** * 发布执行事件. * * @param jobExecutionEvent 做业执行事件 */ void postJobExecutionEvent(JobExecutionEvent jobExecutionEvent); /** * 发布做业状态追踪事件. * * @param taskId 做业Id * @param state 做业执行状态 * @param message 做业执行消息 */ void postJobStatusTraceEvent(String taskId, JobStatusTraceEvent.State state, String message);
做业服务时按IP在${namespaces}/${jobUniqueId}/servers节点注册。同名的做业只会生成唯一的jobInstanceId,生成规则与当前做业服务器JVM的进程ID有关。 JobRegistry用ConcurrentHashMap保存jobName与JobInstance、JobScheduleController 值对关系。会进行更行操做,虽然对象是新内存,但jobInstanceId和scheduler是同一个。
做业框架只负责将分片合理的分配给相关的做业服务器,做业服务器需根据所分配的分片匹配数据进行处理。将真实数据和逻辑分片对应,用于解耦做业框架和数据的关系。分片是发现服务器波动,或修改分片总数,将标记一个状态,而非直接分片。
设置shardingTotalCount、shardingItemParameters 信息。
将分片项设置为1,并使用多于1台的服务器执行做业,做业将会以1主n从的方式执行。一旦执行做业的服务器崩溃,等待执行的服务器将会在下次做业启动时替补执行。开启失效转移和监控功能,能够保证在本次做业执行时崩溃,备机当即启动替补执行(monitorExecution = true && failover =true)。
将任务拆分为n个任务项后,各个服务器分别执行各自分配到的任务项。一旦有新的服务器加入集群,或现有服务器下线,在保留本次任务执行不变的状况下,下次任务开始前触发任务重分片。