Hadoop0.2以前版本和以后版本在Job中有很大的改进,本次采用的版本是Hadoop1.1.2版本。java
如今做为做业驱动器,能够直接继承Configured以及实现Tool,这种方式能够很便捷的获取启动时候命令行中输入的做业配置参数,常规的Job启动以下:缓存
public class SortByHash extends Configured implements Tool { public int run(String[] args) throws Exception { //这里面负责配置job属性 Configuration conf=getConf(); String[] paths=new GenericOptionsParser(conf, args).getRemainingArgs(); String tradeDir=paths[0]; String payDir=paths[1]; String joinDir=paths[2]; Job job=new Job(conf,"JoinJob"); job.setJarByClass(JoinMain.class); FileInputFormat.addInputPath(job, new Path(tradeDir)); FileInputFormat.addInputPath(job, new Path(payDir)); FileOutputFormat.setOutputPath(job, new Path(joinDir)); job.setMapperClass(JoinMapper.class); job.setReducerClass(JoinReducer.class); job.setMapOutputKeyClass(TextIntPair.class); job.setMapOutputValueClass(Text.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); job.waitForCompletion(true); int exitCode=job.isSuccessful()?0:1; return exitCode; } public static void main(String[] args) throws Exception { // TODO Auto-generated method stub int exitCode=ToolRunner.run(new SortByHashPartitioner(), args); System.exit(exitCode); } }
因为Tool的全部实现都须要实现Configurable,而Configured又是Configurable的具体实现,因此要同时继承Configured和实现Tool,这样就不须要实现Tool中定义的全部方法了。利用Tool接口来跑MapReduce,能够在命令行中设置一些参数,比硬编码好不少。网络
注意利用Tool启动做业基本方式以下:ToolRunner首先调用本身的静态方法run,在该方法中会首先建立一个Configurable对象。而后调用GenericOptionsParser解析命令行参数,并设置给刚建立的Configurable对象。而后再次设置主类(这里即SortByHashPartitioner)的setConf方法,最后调用主类的run方法执行。因此在run中要想使用命令行参数必须以下:app
Configurationconf = getConf();分布式
Jobjob = newJob(conf);ide
hadoop0.2以后的做业启动是调用job.waitForCompletion(true);的;而后就会进行做业的提交、执行、完成等操做oop
调用waitForCompletion具体的工做流:编码
第一,调用submit提交做业spa
第二,当参数为true的时候,调用monitorAndPrintJob来进行监听做业的进度。命令行
做业提交即submit():
第一,打开一个JobTracker的链接,这里会建立一个JobClient对象
jobClient= new JobClient((JobConf) getConfiguration());
这里的Configuration在初始化建立job的时候就会主动建立的
第二,根据建立的JobClient来调用submitJobInternal()提交做业给系统。
这里会对于命令行中的选项进行检查
1)获取一个做业编号JobID,jobSubmitClient.getNewJobId(),jobSubmitclient是一个JobSubmissionProtocol,JobTracker就是这个类的子类,在JobClient建立的时候就会new一个
2)获取目录的代理,将运行做业须要的资源jar文件,配置文件都复制到一个以做业ID命名的目录下JobTracker文件系统中。
3)检查做业的输出说明。若是没有指定的输出目录或者输出目录已经不存在,则不提交,返回错误
4)建立做业的输入分片。若是分片没法计算,如输入路径不存在,则不提交,报告错误。
5)将该做业写入做业队列中,而后将该文件写入JobTracker的文件系统中。
6)全部都经过后,真正的提交做业,调用submitJob()告知JobTracker准备执行做业.
做业初始化
当JobTracker接受到submitJob()调用后,会将此调用放入内部队列中queue,交由做业调度器(JobScheduler)进行调度,并对其进行初始化操做。
初始化主要是由做业调度器完成的,建立一个任务运行列表。做业调度器会首先从共享文件系统中获取JobClient已经计算好的分片信息,而后为每个分片建立一个Map任务,建立的reduce数量由mapred.reduce.task来决定,通常是经过setNumReduceTask()设定的。
任务的分配
tasktracker会按期的向jobtracker发送一个心跳告诉是否存活,也是两个之间的通讯通道。这里发送心跳的目的就是利用心跳来告知jobtracker,tasktracker还活着,会指明本身是否已经准备好运行新的任务,若是是,则jobtracker会为它分配一个任务。这里的tasktracker就利用周期性的循环来向jobtracker来“拉活”。
每个tasktracker有固定的map任务槽和reduce任务槽。
选择一个map任务,jobtracker会考虑tasktracker的网络位置,而且选择一个距离其输入分片最近的tasktracker。通常都遵循数据本地化或机架本地化。
选择一个reduce任务,jobtracker简单地从待运行的reduce任务列表中选取下一个来执行,不准要考虑数据的本地化。
任务的执行
当tasktracker初次被分配了一个任务后,就开始要运行该任务。
第一步,从共享文件系统将做业的JAR文件复制到tasktracker所在的文件系统,目的就是实现了做业的JAR文件本地化。而且将应用程序所须要的所有文件从分布式缓存中复制到本地磁盘。
第二步,tasktracker为分配的任务建立一个本地工做目录,将JAR文件内容解压到这个文件夹
第三步,tasktracker建立一个TaskRunner实例来运行该任务。TaskRunner启动一个新的JVM来运行每一个任务。
任务进度的更新
monitorAndPrintJob(),这个方法就是实时的报告做业的运行状况,以打印在控制台上。这个会每隔1秒进行查看,利用的就是Thread.sleep(1000)来执行的。
若是任务报告了进度,则会设置一个标志来代表任务状态发生了变化。在tasktracker中,除了运行任务的线程外,还有个独立的线程每隔3秒会检测任务的状态,若是已经设置,则告知tasktracker当前任务状态。而tasktracker每隔5秒会发送心跳到jobtracker,这里发送心跳的目的主要是报告tasktracker上运行的全部任务的状态。
做业的完成
当jobtracker收到做业的最后一个任务已经完成的通知后,则就会把做业的状态设置为“成功”。此时JobClient会打印一条消息告知用户做业已经完成了。Jobtracker和tasktracker都会清空做业的工做状态。