MapReduce是咱们再进行离线大数据处理的时候常常要使用的计算模型,MapReduce的计算过程被封装的很好,咱们只用使用Map和Reduce函数,因此对其总体的计算过程不是太清楚,同时MapReduce1.0和MapReduce2.0在网上有不少人混淆。java
Input可是输入文件的存储位置,python
,它也能够是本机上的文件位置。
咱们来仔细分析下input算法
首先咱们知道要和JobTracker打交道是离不开JobClient这个接口的,就如上图所示,bash
而后JobClient中的Run方法 会让 JobClient 把全部 Hadoop Job 的信息,好比 mapper reducer jar path, mapper / reducer class name, 输入文件的路径等等,告诉给 JobTracker,以下面的代码所示:网络
public int run(String[] args) throws Exception { //create job Job job = Job.getInstance(getConf(), this.getClass().getSimpleName()); // set run jar class job.setJarByClass(this.getClass()); // set input . output FileInputFormat.addInputPath(job, new Path(PropReader.Reader("arg1"))); FileOutputFormat.setOutputPath(job, new Path(PropReader.Reader("arg2"))); // set map job.setMapperClass(HFile2TabMapper.class); job.setMapOutputKeyClass(ImmutableBytesWritable.class); job.setMapOutputValueClass(Put.class); // set reduce job.setReducerClass(PutSortReducer.class); return 0; }
除此之外,JobClient.runJob() 还会作一件事,使用 InputFormat类去计算如何把 input 文件 分割成一份一份,而后交给 mapper 处理。inputformat.getSplit() 函数返回一个 InputSplit 的 List, 每个 InputSplit 就是一个 mapper 须要处理的数据。app
一个 Hadoop Job的 input 既能够是一个很大的 file, 也能够是多个 file; 不管怎样,getSplit() 都会计算如何分割 input.分布式
inputFormat它其实是个 interface, 须要 类 来继承,提供分割 input 的逻辑。函数
Jobclient 有一个方法叫 setInputFormat(), 经过它,咱们能够告诉 JobTracker 想要使用的 InputFormat 类 是什么。若是咱们不设置,Hadoop默认的是 TextInputFormat, 它默认为文件在 HDFS上的每个 Block 生成一个对应的 InputSplit. 因此你们使用 Hadoop 时,也能够编写本身的 input format, 这样能够自由的选择分割 input 的算法,甚至处理存储在 HDFS 以外的数据。oop
对于每一个 map任务, 咱们知道它的 split 包含的数据所在的主机位置,咱们就把 mapper 安排在那个相应的主机上好了,至少是比较近的host. 你可能会问:split 里存储的 主机位置是 HDFS 存数据的主机,和 MapReduce 的主机 有什么相关呢?为了达到数据本地性,其实一般把MapReduce 和 HDFS 部署在同一组主机上。测试
既然一个 InputSplit 对应一个 map任务, 那么当 map 任务收到它所处理数据的位置信息,它就能够从 HDFS 读取这些数据了。
map函数接受的是一个 key value 对。
RecordReader 能够被定义在每一个 InputFormat 类中。当咱们经过 JobClient.setInputFormat() 告诉 Hadoop inputFormat 类名称的时候, RecordReader 的定义也一并被传递过来。
因此整个Input,
1.JobClient输入输入文件的存储位置
2.JobClient经过InputFormat接口能够设置分割的逻辑,默认是按HDFS文件分割。
3.Hadoop把文件再次分割为key-value对。
4.JobTracker负责分配对应的分割块由对应的maper处理,同时 RecordReader负责读取key-value对值。
JobClient运行后得到所需的配置文件和客户端计算所得的输入划分信息。并将这些信息都存放在JobTracker专门为该做业建立的文件夹中。文件夹名为该做业的Job ID。JAR文件默认会有10个副本(mapred.submit.replication属性控制);
而后输入划分信息告诉了JobTracker应该为这个做业启动多少个map任务等信息。
JobTracker经过TaskTracker 向其汇报的心跳状况和slot(状况),每个slot能够接受一个map任务,这样为了每一台机器map任务的平均分配,JobTracker会接受每个TaskTracker所监控的slot状况。
JobTracker接收到做业后,将其放在一个做业队列里,等待做业调度器对其进行调度,看成业调度器根据本身的调度算法调度到该做业时,会根据输入划分信息为每一个划分建立一个map任务,并将map任务分配给TaskTracker执行,分配时根据slot的状况做为标准。
TaskTracker每隔一段时间会给JobTracker发送一个心跳,告诉JobTracker它依然在运行,同时心跳中还携带着不少的信息,好比当前map任务完成的进度等信息。当JobTracker收到做业的最后一个任务完成信息时,便把该做业设置成“成功”。当JobClient查询状态时,它将得知任务已完成,便显示一条消息给用户。
Shuffle是咱们不须要编写的模块,但倒是十分关键的模块。
在map中,每一个 map 函数会输出一组 key/value对, Shuffle 阶段须要从全部 map主机上把相同的 key 的 key value对组合在一块儿,(也就是这里省去的Combiner阶段)组合后传给 reduce主机, 做为输入进入 reduce函数里。
Partitioner组件 负责计算哪些 key 应当被放到同一个 reduce 里
HashPartitioner类,它会把 key 放进一个 hash函数里,而后获得结果。若是两个 key 的哈希值 同样,他们的 key/value对 就被放到同一个 reduce 函数里。咱们也把分配到同一个 reduce函数里的 key /value对 叫作一个reduce partition.
咱们看到 hash 函数最终产生多少不一样的结果, 这个 Hadoop job 就会有多少个 reduce partition/reduce 函数,这些 reduce函数最终被JobTracker 分配到负责 reduce 的主机上,进行处理。
咱们知道map阶段可能会产生多个spill file 当 Map 结束时,这些 spill file 会被 merge 起来,不是 merge 成一个 file,而是也会按 reduce partition 分红多个。
当 Map tasks 成功结束时,他们会通知负责的 tasktracker, 而后消息经过 jobtracker 的 heartbeat 传给 jobtracker. 这样,对于每个 job, jobtracker 知道 map output 和 map tasks 的关联。Reducer 内部有一个 thread 负责按期向 jobtracker 询问 map output 的位置,直到 reducer 获得全部它须要处理的 map output 的位置。
Reducer 的另外一个 thread 会把拷贝过来的 map output file merge 成更大的 file. 若是 map task 被 configure 成须要对 map output 进行压缩,那 reduce 还要对 map 结果进行解压缩。当一个 reduce task 全部的 map output 都被拷贝到一个它的 host上时,reduce 就要开始对他们排序了。
排序并非一次把全部 file 都排序,而是分几轮。每轮事后产生一个结果,而后再对结果排序。最后一轮就不用产生排序结果了,而是直接向 reduce 提供输入。这时,用户提供的 reduce函数 就能够被调用了。输入就是 map 任务 产生的 key value对.
reduce() 函数以 key 及对应的 value 列表做为输入,按照用户本身的程序逻辑,经合并 key 相同的 value 值后,产 生另一系列 key/value 对做为最终输出写入 HDFS