MapReduce做业提交源码分析 app
咱们在编写MapReduce程序的时候,首先须要编写Map函数和Reduce函数。完成mapper和reducer的编写后,进行Job的配置;Job配置完成后,调用Job.submit()方法完成做业的提交。那咱们思考一下,Job最终如何完成做业(job)的提交呢?粗略想一下,Job必然须要经过某种方式链接到JobTracker,由于只有这样才能将job提交到JobTracker上进行调度执行。还须要考虑一下,咱们本身编写的mapper和reducer,即Jar文件如何传送到JobTracker上呢?其中有一种最简单也比较直观的方法,直接经过socket传输给JobTracker,由JobTracker再传输给TaskTracker(注意:MapReduce并无采用这种方法)。第三个须要考虑的内容是,JobTracker如何将用户做业的配置转化成map task和reduce task。下面咱们来分析一下MapReduce这些功能的实现。 socket
首先在class Job内部经过JobClient完成做业的提交,最终由JobClient完成与JobTracker的交互功能。在JobClient的构造函数中,经过调用RPC完成与JobTracker链接的创建。 函数
完成创建后,JobClient首先肯定job相关文件的存放位置(咱们上面提到mapreduce没有采用将jar即其余文件传输给JobTracker的方式,而是将这些文件保存到HDFS当中,而且能够根据用户的配置存放多份)。至于该存放目录的分配是经过调用RPC访问JobTracker的方法来进行分配的,下面看一下JobTracker的分配代码: oop
final Path stagingRootDir = new Path(conf.get( 源码分析
"mapreduce.jobtracker.staging.root.dir", spa
"/tmp/hadoop/mapred/staging")); hadoop
final FileSystem fs = stagingRootDir.getFileSystem(conf); get
return fs.makeQualified(new Path(stagingRootDir, user + "/.staging")).toString(); 源码
注意上面代码所生成的stagingRootDir是全部job文件的存放目录,是一个根目录,并不单指当前job。 it
完成job存放目录的分配后,JobClient向JobTracker申请一个JobID(经过RPC,注意基本上JobClient与JobTracker的全部通讯都是经过RPC完成的,若是下文没有显示著名也应该属于这种状况)。
JobID jobId = jobSubmitClient.getNewJobId();
下面是JobTracker.getNewJobId的具体实现:
public synchronized JobID getNewJobId() throws IOException {
return new JobID(getTrackerIdentifier(), nextJobId++);
}
得到JobID后,将该JobID与上面的stagingRootDir组合就构成了Job文件的具体存放地址的构建。进行这些相关工做后,JobClient将相关的文件存储到HDFS当中。