本文系原创,如有转载须要,请注明出处。https://www.cnblogs.com/bigdata-stone/java
MapReduce是面向大数据并行处理的计算模型、框架和平台。算法
在 MapReduce 程序的开发过程当中,每每须要用到 FileInputFormat与 TextInputFormat,咱们会发现 TextInputFormat 这个类继承自FileInputFormat , FileInputFormat 这 个 类 继 承 自 InputFormat ,InputFormat 这个类会将文件 file 按照逻辑进行划分,划分红的每个split 切片将会被分配给一个 Mapper 任务,文件先被切分红 split 块,然后每个 split 切片对应一个 Mapper 任务 。缓存
input File 经过 split 被逻辑切分为多个 split 文件,经过 Record按行读取内容给 map(用户本身实现的)进行处理,数据被 map 处理结束以后交给 OutputCollector 收集器,对其结果 key 进行分区(默认使用 hash 分区),而后写入 buffer,每一个 map task 都有一个内存缓冲区,存储着 map 的输出结果,当缓冲区快满的时候须要将缓冲区的数据以一个临时文件的方式存放到磁盘,当整个 map task 结束后再对磁盘中这个 maptask 产生的全部临时文件作合并,生成最终的正式输出文件,而后等待 reduce task 来拉数据。 Map 端的输入的(k,v)分别是该行的起始偏移量,以及每一行的数据内容,map 端的输出(k,v)能够根据需求进行自定义,可是若是输出的是 javabean 对象,须要对javabean 继承 writable 。网络
分区函数partitioner 的做用是将 mapper输出的 key/value经过给定的分区函数来拆分为分片(shard),每一个 reducer 对应一个分片 默认状况下, partitioner 先计算 key 的散列值(一般为 md5值)。而后通reducer 个数执行取模运算: key.hashCode%(reducer 个数)。这种方式不只可以随机地将整个key空间平均分发给每一个reducer,同时也能确保不一样mapper产生的相同key能被分发到同一个reducer。也能够自定义分区去继承 partition<key,value>把不一样的结果写入不一样的文件中分区 Partitioner 主要做用在于如下两点 (1)根据业务须要,产生多个输出文件;(2)多个 reduce 任务并发运行,提升总体 job 的运行效率 map 端的 combine 组件。并发
每个 map 均可能会产生大量的本地输出, Combiner 的做用就是对 map 端的输出先作一次合并,以减小在 map 和 reduce 节点之间的数据传输量,以提升网络 IO 性能,是 MapReduce 的一种优化手段之一combiner 是 MR 程序中 Mapper 和 Reducer 以外的一种组件combiner 组件的父类就是 Reducercombiner 和 reducer 的区别在于运行的位置:combiner 是在每个 maptask 所在的节点运行reducer 是接收全局全部 Mapper 的输出结果;combiner 的意义就是对每个 maptask 的输出进行局部汇总,以减少网络传输量具体实现步骤:app
1)自定义一个 combiner 继承 Reducer,重写 reduce 方法框架
2)中设置: job.setCombinerClass(CustomCombiner.class)combiner 可以应用的前提是不能影响最终的业务逻辑,并且,combine输出 kv 应该跟 reducer 的输入 kv 类型要对应起来 分布式
Combiner 使用须要注意的是:ide
1.有不少人认为这个 combiner 和 map 输出的数据合并是一个过程,其实否则, map 输出的数据合并只会产生在有数据 spill 出的时候,即进行 merge 操做。函数
2.与 mapper 与 reducer 不一样的是, combiner 没有默认的实现,须要显式的设置在 conf 中才有做用。
3.并非全部的 job 都适用 combiner,只有操做知足结合律的才可设置 combiner。 combine 操做相似于: opt(opt(1, 2, 3), opt(4, 5,6))。若是 opt 为求和、求最大值的话,可使用,可是若是是求中值的话,不适用。
4.通常来讲, combiner 和 reducer 它们俩进行一样的操做。
shuffle 的过程是:Map 产生输出开始到 Reduc 取得数据做为输入以前的过程称做 shuffle.1).Collect 阶段:将 MapTask 的结果输出到默认大小为100M 的环形缓冲区,保存的是 key/value, Partition 分区信息等。2).Spill 阶段:当内存中的数据量达到必定的阀值的时候,就会将数据写入本地磁盘,在将数据写入磁盘以前须要对数据进行一次排序的操做,若是配置了 combiner,还会将有相同分区号和 key 的数据进行排序。3).Merge 段把全部溢出的临时文件进行一次合并操做,以确保一个MapTask 最终只产生一个中间数据文件。4).Copy 阶段: ReduceTask 启动 Fetcher 线程到已经完成MapTask 的节点上复制一份属于本身的数据,这些数据默认会存在内存的缓冲区中,当内存的缓冲区达到必定的阀值的时候,就会将数据写到磁盘之上。5).Merge 阶段:在 ReduceTask 远程复制数据的同时,会在后台开启两个线程对内存到本地的数据文件进行合并操做。6).Sort 阶段:在对数据进行合并的同时,会进行排序操做,因为 MapTask 阶段已经对数据进行了局部的排序,ReduceTask 只需保证 Copy 的数据的最终总体有效性便可。Shuffle 中的缓冲区大小会影响到 mapreduce 程序的执行效率,原则上说,缓冲区越大,磁盘 io 的次数越少,执行速度就越快缓冲区的大小能够经过参数调整, 参数:io.sort.mb 默认 100M
reducer 将已经分好组的数据做为输入,并依次为每一个键对应分组执行 reduce 函数。 reduce 函数的输入是键以及包含与该键对应的全部值的迭代器。reduce 端的输入是 map 端的输出,它的输出的(k,v)根据需求进行自定义reducetask 并行度一样影响整个 job 的执行并发度和执行效率,与maptask的并发数由切片数决定不一样, Reducetask 数量的决定是能够直接手动设置:job.setNumReduceTasks(4);若是数据分布不均匀,就有可能在 reduce 阶段产生数据倾斜。默认的 reduceTask 的是 1 。
OutputFormat 主要用于描述输出数据的格式,它可以将用户提供的 key/value对写入特定格式的文件中。 Hadoop 自带了不少 OutputFormat 的实现,它们与InputFormat 实现相对应,足够知足咱们业务的须要。
Map端代码
public class WCMapper extends Mapper<LongWritable,Text,Text,IntWritable> {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
Text keyout = new Text();
IntWritable valueout = new IntWritable();
String[] arr = value.toString().split(" ");
for(String s:arr){
keyout.set(s);
valueout.set(1);
context.write(keyout,valueout);
}
}
}
reduce端代码:
public class WCReducer extends Reducer<Text,IntWritable,Text,IntWritable>{
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
int count = 0 ;
for(IntWritable iw:values){
count=iw.get()+count;
}
context.write(key,new IntWritable(count));
}
}
app端代码:
public class WCApp {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Configuration conf= new Configuration();
// conf.set("fs.defaultFS","file:///");
Job job = Job.getInstance(conf);
job.setJobName("WCApp");
job.setJarByClass(WCApp.class);
FileInputFormat.addInputPath(job,new Path(args[0]));
FileOutputFormat.setOutputPath(job,new Path(args[1]));
job.setInputFormatClass(TextInputFormat.class);
job.setMapperClass(WCMapper.class);
job.setReducerClass(WCReducer.class);
job.setNumReduceTasks(1);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
job.waitForCompletion(true);
}
}
程序运行时过程设计到的一个角色实体
1.1. Client:编写mapreduce程序,配置做业,提交做业的客户端 ;
1.2. ResourceManager:集群中的资源分配管理 ;
1.3. NodeManager:启动和监管各自节点上的计算资源 ;
1.4. ApplicationMaster:每一个程序对应一个AM,负责程序的任务调度,自己也是运行在NM的Container中 ;
1.5. HDFS:分布式文件系统,保存做业的数据、配置信息等等。
客户端提交Job
2.1. 客户端编写好Job后,调用Job实例的Submit()或者waitForCompletion()方法提交做业;
2.2. 客户端向ResourceManager请求分配一个Application ID,客户端会对程序的输出、输入路径进行检查,若是没有问题,进行做业输入分片的计算。
Job提交到ResourceManager
3.1. 将做业运行所须要的资源拷贝到HDFS中(jar包、配置文件和计算出来的输入分片信息等);
3.2. 调用ResourceManager的submitApplication方法将做业提交到ResourceManager。
给做业分配ApplicationMaster
4.1. ResourceManager收到submitApplication方法的调用以后会命令一个NodeManager启动一个Container ;
4.2. 在该NodeManager的Container上启动管理该做业的ApplicationMaster进程。
ApplicationMaster初始化做业
5.1. ApplicationMaster对做业进行初始化操做;
5.2. ApplicationMaster从HDFS中得到输入分片信息(map、reduce任务数)
任务分配
6.1. ApplicationMaster为其每一个map和reduce任务向RM请求计算资源;
6.2. map任务优先于reduce任,map数据优先考虑本地化的数据。任务执行,在 Container 上启动任务(经过YarnChild进程来运行),执行map/reduce任务。
输入分片(input split)
每一个输入分片会让一个map任务来处理,默认状况下,以HDFS的一个块的大小(默认为128M,能够设置)为一个分片。map输出的结果会暂且放在一个环形内存缓冲区中(mapreduce.task.io.sort.mb=100M
),当该缓冲区快要溢出时(默认mapreduce.map.sort.spill.percent=0.8
),会在本地文件系统中建立一个溢出文件,将该缓冲区中的数据写入这个文件;
map阶段:由咱们本身编写,最后调用 context.write(…);
partition分区阶段
3.1. 在map中调用 context.write(k2,v2)方法输出,该方法会马上调用 Partitioner类对数据进行分区,一个分区对应一个 reduce task。
3.2. 默认的分区实现类是 HashPartitioner ,根据k2的哈希值 % numReduceTasks
,可能出现“数据倾斜”现象。
3.3. 能够自定义 partition ,调用 job.setPartitioner(…)本身定义分区函数。
combiner合并阶段:将属于同一个reduce处理的输出结果进行合并操做
4.1. 是可选的;
4.2. 目的有三个:1.减小Key-Value对;2.减小网络传输;3.减小Reduce的处理。
shuffle阶段:即Map和Reduce中间的这个过程
5.1. 首先 map 在作输出时候会在内存里开启一个环形内存缓冲区,专门用来作输出,同时map还会启动一个守护线程;
5.2. 如缓冲区的内存达到了阈值的80%,守护线程就会把内容写到磁盘上,这个过程叫spill,另外的20%内存能够继续写入要写进磁盘的数据;
5.3. 写入磁盘和写入内存操做是互不干扰的,若是缓存区被撑满了,那么map就会阻塞写入内存的操做,让写入磁盘操做完成后再继续执行写入内存操做;
5.4. 写入磁盘时会有个排序操做,若是定义了combiner函数,那么排序前还会执行combiner操做;
5.5. 每次spill操做也就是写入磁盘操做时候就会写一个溢出文件,也就是说在作map输出有几回spill就会产生多少个溢出文件,等map输出所有作完后,map会合并这些输出文件,这个过程里还会有一个Partitioner操做(如上)
5.6. 最后 reduce 就是合并map输出文件,Partitioner会找到对应的map输出文件,而后进行复制操做,复制操做时reduce会开启几个复制线程,这些线程默认个数是5个(可修改),这个复制过程和map写入磁盘过程相似,也有阈值和内存大小,阈值同样能够在配置文件里配置,而内存大小是直接使用reduce的tasktracker的内存大小,复制时候reduce还会进行排序操做和合并文件操做,这些操做完了就会进行reduce计算了。
reduce阶段:由咱们本身编写,最终结果存储在hdfs上的。