一,介绍html
Oozie是一个基于Hadoop的工做流调度器,它能够经过Oozie Client 以编程的形式提交不一样类型的做业,如MapReduce做业和Spark做业给底层的计算平台(如 Cloudera Hadoop)执行。java
Quartz是一个开源的调度软件,它为任务的调度执行提供了各类触发器以及监听器git
下面使用Quartz + Oozie 将一个MapReduce程序提交给Cloudera Hadoop执行github
二,调度思路编程
①为何要用Quartz呢?主要是借助Quartz强大的触发器功能。它能够容许知足不一样的调度需求,如每周执行做业一次、重复执行做业多少次。这里有一个重要的问题:假设我有一个做业须要重复执行,当第一次把该做业提交到CDH上执行后,之后须要执行该做业时再也不是又一次把该做业上传到CDH上而后执行,而是把提交过的做业记录下来,下次须要运行时,直接让CDH再运行该做业。ide
②使用Quartz还有一个好处就是:在做业提交的时候能够作一些控制。好比,某种类型的做业提交的频率很高,或者运行时间较短(根据它上次执行完的状况来判断),那么下次运行它时,让它具备更高的优先级。函数
③使用Oozie的目的很明确,就是让它把做业发送给底层的计算平台,如CDH去执行做业。oop
三,Eclipse开发环境搭建测试
主要是须要Quartz和Oozie的依赖包。具体以下:ui
四,实现思路
a) 调度系统目前只考虑调度两种类型的做业:Mapreduce做业和Spark做业。先把这二种做业经过Quartz传递给Oozie,而后再让Oozie把做业提交给CDH计算平台去执行。
b) Quartz提供了一个公共的Job接口。里面只有一个execute()方法,该方法负责完成Quartz所调度的做业的具体功能:把做业传递给Oozie
c) 定义一个抽象类BaseJob,它里面定义了二个方法。这二个方法主要是用来作一些准备工做,即便用Quartz把做业传递给Oozie时须要找到做业在HDFS上的存储目录,并将之复制执行目录下。
d) 最后是两个具体的实现类,MRJob和SparkJob,它们分别表明Mapreduce做业和Spark做业。在实现类里面完成做业的配置,而后将做业提交到CDH计算平台上执行。
相关类图以下:
五,具体代码分析
MRJob.java
实现了org.quartz.Job接口的execute(),该方法当触发器被触发时,会自动地被Quartz Schedule 调度执行。这样,就能够根据须要定义触发器,控制做业什么时候提交给Oozie。
@Override public void execute(JobExecutionContext arg0) throws JobExecutionException { try{ String jobId = wc.run(conf);//submit job to oozie and get the jobId System.out.println("Workflow job submitted"); //wait until the workflow job finishes while(wc.getJobInfo(jobId).getStatus() == Status.RUNNING){ System.out.println("Workflow job running..."); try{ Thread.sleep(10*1000); }catch(InterruptedException e){e.printStackTrace();} } System.out.println("Workflow job completed!"); System.out.println(wc.getJobId(jobId)); }catch(OozieClientException e){e.printStackTrace();} }
测试的main函数程序以下:能够看出对于客户端而言,只须要按照编写常规的Quartz做业方式,就能够调试MapReduce做业了。要想运行该程序,固然还得提早准备到做业的运行环境。具体参考
import static org.quartz.JobBuilder.newJob; import static org.quartz.TriggerBuilder.newTrigger; import java.util.Date; import org.quartz.JobDetail; import org.quartz.Scheduler; import org.quartz.SchedulerFactory; import org.quartz.SimpleTrigger; import org.quartz.impl.StdSchedulerFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.quartz.job.MRJob; public class QuartzOozieJobTest { public static void main(String[] args) throws Exception{ QuartzOozieJobTest test = new QuartzOozieJobTest(); test.run(); } public void run() throws Exception{ Logger log = LoggerFactory.getLogger(QuartzOozieJobTest.class); log.info("------- Initializing ----------------------"); SchedulerFactory sf = new StdSchedulerFactory(); Scheduler sched = sf.getScheduler(); long startTime = System.currentTimeMillis() + 20000L; Date startTriggerTime = new Date(startTime); JobDetail jobDetail = newJob(MRJob.class).withIdentity("job", "group1").build(); SimpleTrigger trigger = (SimpleTrigger) newTrigger().withIdentity("trigger", "group1").startAt(startTriggerTime).build(); Date ft = sched.scheduleJob(jobDetail, trigger); log.info(jobDetail.getKey() + " will submit at " + ft + " only once."); sched.start(); // sched.shutdown(true); } }
整个项目的源代码下载 :https://github.com/hapjin/JAVA/tree/master/oozie-quartz