MapReduce工做的基本流程

     Hadoop0.2以前版本和以后版本在Job中有很大的改进,本次采用的版本是Hadoop1.1.2版本。java

     如今做为做业驱动器,能够直接继承Configured以及实现Tool,这种方式能够很便捷的获取启动时候命令行中输入的做业配置参数,常规的Job启动以下:缓存


public class SortByHash extends Configured implements Tool
{
   public int run(String[] args) throws Exception {
    //这里面负责配置job属性
        Configuration conf=getConf();
        String[] paths=new GenericOptionsParser(conf, args).getRemainingArgs();
        String tradeDir=paths[0];
        String payDir=paths[1];
    String joinDir=paths[2];
    Job job=new Job(conf,"JoinJob");
    job.setJarByClass(JoinMain.class);
    FileInputFormat.addInputPath(job, new Path(tradeDir));
    FileInputFormat.addInputPath(job, new Path(payDir));
    FileOutputFormat.setOutputPath(job, new Path(joinDir));
                                                                                                                                                          
    job.setMapperClass(JoinMapper.class);
    job.setReducerClass(JoinReducer.class);
    job.setMapOutputKeyClass(TextIntPair.class);
    job.setMapOutputValueClass(Text.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(Text.class);
    job.waitForCompletion(true);
    int exitCode=job.isSuccessful()?0:1;
    return exitCode;
   }
   public static void main(String[] args) throws Exception
   {
      // TODO Auto-generated method stub
      int exitCode=ToolRunner.run(new SortByHashPartitioner(), args);
      System.exit(exitCode);
   }
}


因为Tool的全部实现都须要实现Configurable,Configured又是Configurable的具体实现,因此要同时继承Configured和实现Tool,这样就不须要实现Tool中定义的全部方法了。利用Tool接口来跑MapReduce,能够在命令行中设置一些参数,比硬编码好不少。网络

      注意利用Tool启动做业基本方式以下:ToolRunner首先调用本身的静态方法run,在该方法中会首先建立一个Configurable对象。而后调用GenericOptionsParser解析命令行参数,并设置给刚建立的Configurable对象。而后再次设置主类(这里即SortByHashPartitioner)setConf方法,最后调用主类的run方法执行。因此在run中要想使用命令行参数必须以下:app

       Configurationconf = getConf();分布式

       Jobjob = newJob(conf);ide

       hadoop0.2以后的做业启动是调用job.waitForCompletion(true);的;而后就会进行做业的提交、执行、完成等操做oop

调用waitForCompletion具体的工做流:编码

第一,调用submit提交做业spa

第二,当参数为true的时候,调用monitorAndPrintJob来进行监听做业的进度。命令行

做业提交即submit():

第一,打开一个JobTracker的链接,这里会建立一个JobClient对象

jobClient= new JobClient((JobConf) getConfiguration());

这里的Configuration在初始化建立job的时候就会主动建立的

第二,根据建立的JobClient来调用submitJobInternal()提交做业给系统。

这里会对于命令行中的选项进行检查

1)获取一个做业编号JobIDjobSubmitClient.getNewJobId()jobSubmitclient是一个JobSubmissionProtocolJobTracker就是这个类的子类,JobClient建立的时候就会new一个

2)获取目录的代理,将运行做业须要的资源jar文件,配置文件都复制到一个以做业ID命名的目录下JobTracker文件系统中。

3)检查做业的输出说明。若是没有指定的输出目录或者输出目录已经不存在,则不提交,返回错误

4)建立做业的输入分片。若是分片没法计算,如输入路径不存在,则不提交,报告错误。

5)将该做业写入做业队列中,而后将该文件写入JobTracker的文件系统中

6)全部都经过后,真正的提交做业,调用submitJob()告知JobTracker准备执行做业.

做业初始化

JobTracker接受到submitJob()调用后,会将此调用放入内部队列中queue,交由做业调度器(JobScheduler)进行调度,并对其进行初始化操做。

初始化主要是由做业调度器完成的,建立一个任务运行列表。做业调度器会首先从共享文件系统中获取JobClient已经计算好的分片信息,而后为每个分片建立一个Map任务,建立的reduce数量由mapred.reduce.task来决定,通常是经过setNumReduceTask()设定的。

任务的分配

tasktracker会按期的向jobtracker发送一个心跳告诉是否存活,也是两个之间的通讯通道。这里发送心跳的目的就是利用心跳来告知jobtrackertasktracker还活着,会指明本身是否已经准备好运行新的任务,若是是,则jobtracker会为它分配一个任务。这里的tasktracker就利用周期性的循环来向jobtracker来“拉活”。

每个tasktracker有固定的map任务槽和reduce任务槽。

选择一个map任务,jobtracker会考虑tasktracker的网络位置,而且选择一个距离其输入分片最近的tasktracker。通常都遵循数据本地化或机架本地化。

选择一个reduce任务,jobtracker简单地从待运行的reduce任务列表中选取下一个来执行,不准要考虑数据的本地化。

任务的执行

tasktracker初次被分配了一个任务后,就开始要运行该任务。

第一步,从共享文件系统将做业的JAR文件复制到tasktracker所在的文件系统,目的就是实现了做业的JAR文件本地化。而且将应用程序所须要的所有文件从分布式缓存中复制到本地磁盘。

第二步,tasktracker为分配的任务建立一个本地工做目录,将JAR文件内容解压到这个文件夹

第三步,tasktracker建立一个TaskRunner实例来运行该任务。TaskRunner启动一个新的JVM来运行每一个任务。

任务进度的更新

monitorAndPrintJob(),这个方法就是实时的报告做业的运行状况,以打印在控制台上。这个会每隔1秒进行查看,利用的就是Thread.sleep(1000)来执行的。

若是任务报告了进度,则会设置一个标志来代表任务状态发生了变化。在tasktracker中,除了运行任务的线程外,还有个独立的线程每隔3秒会检测任务的状态,若是已经设置,则告知tasktracker当前任务状态。而tasktracker每隔5秒会发送心跳到jobtracker,这里发送心跳的目的主要是报告tasktracker上运行的全部任务的状态。

做业的完成

jobtracker收到做业的最后一个任务已经完成的通知后,则就会把做业的状态设置为“成功”。此时JobClient会打印一条消息告知用户做业已经完成了。Jobtrackertasktracker都会清空做业的工做状态。

相关文章
相关标签/搜索