在上一篇文章中咱们已经了解了HDFS的读写流程,HA高可用,联邦和Sequence Files方案,简单回顾一下HDFS的写流程吧html
Client调用Distributed FileSystem的create方法,这个过程是远程调用了NameNode的create方法,此时NameNode就会作四件事情apache
检查本身是否正常运行编程
判断要写进HDFS的文件是否存在api
检查client是否具备建立权限缓存
对这次操做进行日志记录(edits log)服务器
此时create方法会返回一个OutputStream,这个流还需啊哟和NameNode进行交互,调用NameNode的addBlock()方法,以得知这个block须要写在哪些数据节点上。网络
开始写数据时先写在一个chuck上,附带着一个4字节的checkSum,总共516字节,而后再把这些chuck写在一个更大的结构package中,在package被多个chuck写满以后,把package放到一个叫作data queue的队列中,以后所作的事情有两个数据结构
data queue中的package往数据节点DataNode上传输,传输的顺序按照NameNode的addBlock()方法返回的列表依次传输app
往DataNode上传输的同时也往确认队列ack queue上传输框架
针对DataNode中传输完成的数据作一个checkSum,并与本来打包前的checkSum作一个比较
校验成功,就从确认队列ack queue中删除该package,不然该package从新置入data queue重传
完成后经过心跳机制NameNode就能够得知副本已经建立完成,再调用addBlock()方法写以后的文件。
异常的状况就再也不从新说明了,能够直接跳到第二篇进行查看
MapReduce是采用一种分而治之思想设计出来的分布式计算框架
在计算复杂或者计算量大的任务,单台服务器没法胜任时,可将其切分红一个个小的任务,小任务分别在不一样的服务器上并行执行,最终再汇总每一个小任务的结果便可
MapReduce由两个阶段组成,切分红小任务的Map阶段和汇总小任务的Reduce阶段,以下图,须要注意,三个小任务是能够并行执行的
map()函数的输入时键值对,输出的是一系列键值对,输出的结果时写入本地磁盘的
reduce()函数的输入时键值对(即map()函数的输出),输出是一系列键值对,最终写入HDFS
大致逻辑在下面的图很是清晰明了了,shuffle的过程以后再说明
永远都逃不过的词频统计,统计一篇文章中,各个单词出现的次数
从左到右,有一个文件,HDFS对它进行了分块存储,且每个块咱们也能够视为是一个分片(split),而后它提供一个kv对(0,Dear Bear River)过来,key为何是0呢?那这里的0实际上是偏移量,这个偏移量是会随着文件中的数据字节大小进行变化的。在当前例子中暂时咱们还用不上,咱们须要作的只是把做为value的Dear Bear River作一个拆分,而后进行统计,统计完成后开始读第二行的Dear Car,一样输出便可。
以后这个文件分红的3个块都统计好以后,再按照同一个单词汇聚到同一个节点进行统计的方式,得出结果便可
1.咱们能够看到在上图存在着 4 个单词 4 个 reduce task,可是这个reduce task的个数是由开发人员本身决定的,只是一个SetReduceNum(4)的问题
2.为何reduce能够得知究竟有多少个单词,提到shuffle时咱们再说。
3.细心的你应该会发现shufflling事后的那些(Dear,1)有4个,但是key不该该只能存在一个么,这也是shuffle的时候要说的
public class WordMap extends Mapper<LongWritable, Text, Text, IntWritable> {
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String[] words = value.toString().split(" ");
for (String word : words) {
// 每一个单词出现1次,做为中间结果输出
context.write(new Text(word), new IntWritable(1));
}
}
}
复制代码
这里的LongWritable对应Java里面的Long类型,Text对应String类型,由于分布式框架中数据从一个节点到另外一个节点时会存在序列化和反序列化的问题,因此Hadoop自身提供了一些带有序列化功能的类供咱们使用,也就是平时咱们看到的键值对是(Long,String),在这里就变成了(LongWritable,Text)而已。
以后就是覆写map()方法,实现单词分割,以后把每一个单词做为key,以(word,1)这种状态输出出去。
想要查看这些API方法的话,能够去hadoop官网查看,这里我用的仍是2.7.3,看过上一篇的同窗应该也是知道了
这里有两个Mapper是由于第一个Mapper是老的Mapper,如今已经使用新的了。点击Method以后就能够看到刚刚使用的map()方法了
public class WordReduce extends Reducer<Text, IntWritable, Text, IntWritable> {
/*
key: hello
value: List(1, 1, ...)
*/
protected void reduce(Text key, Iterable<IntWritable> values,
Context context) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable count : values) {
sum = sum + count.get();
}
context.write(key, new IntWritable(sum));// 输出最终结果
};
}
复制代码
有了上一个2.2的基础,这个代码就再也不展开说明了,就是把value进行累加,而后得出一个sum,key仍是指单词,以后以(word,sum)这种状态输出出去。
补充:当value中的列表很是大时,会选择提升集群内存或者设置一些读句子时候的限制(自定义InputFormat类,MapReduce默认的是TextInputFormat)把数据大小给减小。
这里的main方法基本每个都是直接拷贝过来而后填填set方法的参数直接用的
public class WordMain {
public static void main(String[] args) throws IOException,
ClassNotFoundException, InterruptedException {
if (args.length != 2 || args == null) {
System.out.println("please input Path!");
System.exit(0);
}
Configuration configuration = new Configuration();
// 生成一个job实例
Job job = Job.getInstance(configuration, WordMain.class.getSimpleName());
// 打jar包以后,找程序入口用
job.setJarByClass(WordMain.class);
// 经过job设置输入/输出格式
// MR的默认输入格式就是TextInputFormat,因此注释掉也没问题
//job.setInputFormatClass(TextInputFormat.class);
//job.setOutputFormatClass(TextOutputFormat.class);
// 设置输入/输出路径
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
// 设置处理Map/Reduce阶段的类
job.setMapperClass(WordMap.class);
job.setReducerClass(WordReduce.class);
//若是map、reduce的输出的kv对类型一致,直接设置reduce的输出的kv对就行;若是不同,须要分别设置map, reduce的输出的kv类型
//job.setMapOutputKeyClass(.class)
// 设置最终输出key/value的类型m
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
// 提交做业
job.waitForCompletion(true);
}
}
复制代码
运行的方式能够本地运行,能够集群运行,能够maven打包运行也能够,运行结果能够经过yarn查看,由于考虑到你们可能没时间去搭建一个集群玩这里就不贴图了,后面找机会分享一下简答的3节点的集群搭建。
map端的本地聚合,不管运行多少次combiner操做,都不会影响最终的结果
注意:不是全部MapReduce程序都适合使用,好比求average
WordCountMap与WordCountReduce代码不变
WordCountMain中,增长job.setCombinerClass(WordCountReduce.class);
复制代码
键值对一开始的时候是第一张图的样子,如今咱们刚通过Mapping时会存在大量的键值对,它们会经过网络传到对应的Reducing那,若是都是按照(word,1)的格式传输过去,传输的数据量就变得很是巨大,因此这时候最好的方案是先在本地对某一个单词先作一个汇总,也就是combine操做,如图,两个(dear,1)变成了一个(Dear,2),2个(Car,1)变成了(Car,2)等···
map task 输出的时候会输出到一个环形缓冲区中,每个环形缓冲区是100M大小,随着数据的不断读写,让环形缓冲区的内存达到80%,这时候会形成溢出写磁盘,把这些文件写到磁盘中,而这个写到磁盘的操做会经历3个过程
首先是分区,默认状况下是利用key来进行分区操做,MapReduce框架专门提供了一个HashPartitioner用于进行分区操做
环形缓冲区的kv对在落入磁盘前都须要去调用一下getPartition()方法,此时咱们能够看到,它使用了一个比较巧妙的方法:先是计算了一下这个key的hashcode,再模上一个reduce的个数,这种时候咱们看上面的图,reduce的个数是4,那咱们一个数字去模4,结果只会是4个,也就是0,1,2,3,因此这四个结果就会对应不一样的缓冲区
剩下的就是reduce task来进行拉取数据,刚开始时会放到内存当中,放不下的时候也会溢出写到磁盘
固然若是一开始的时候有进行setCombine操做的话就会变成(Dear,4),在图中由于咱们是举例说明,实际状况下每一个分区都有不少不一样的单词,在reduce操做时就会进行合并操做,即相同的key放在一块儿,而后按照字母顺序排序。
combine,merge,和最后的reduce task,这些功能都同样,只不过做用的阶段不一样,方便提高性能。只要达到业务要求就行,有时候一个map就能解决需求,有时候须要map和reduce两个阶段。
以后每个reduce task的结果都会写到HDFS的一个文件里。当map task完成后,后面说yarn的时候会有一个appMaster,作一个轮询的确认,确认完成后再通知reduce task从本地磁盘拉取,有比较多的具体知识须要后续跟进时才会在最后造成一个比较清晰的概念,这也是很是正常的。
MapReduce中根据key进行分区排序和分组,若是如今须要自定义key类型,并自定义key的排序规则,如何实现(结合代码讲解)
public class Person implements WritableComparable<Person> {
private String name;
private int age;
private int salary;
public Person() {
}
public Person(String name, int age, int salary) {
//super();
this.name = name;
this.age = age;
this.salary = salary;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public int getAge() {
return age;
}
public void setAge(int age) {
this.age = age;
}
public int getSalary() {
return salary;
}
public void setSalary(int salary) {
this.salary = salary;
}
@Override
public String toString() {
return this.salary + " " + this.age + " " + this.name;
}
//先比较salary,高的排序在前;若相同,age小的在前
public int compareTo(Person o) {
int compareResult1= this.salary - o.salary;
if(compareResult1 != 0) {
return -compareResult1;
} else {
return this.age - o.age;
}
}
//序列化,将NewKey转化成使用流传送的二进制
public void write(DataOutput dataOutput) throws IOException {
dataOutput.writeUTF(name);
dataOutput.writeInt(age);
dataOutput.writeInt(salary);
}
//使用in读字段的顺序,要与write方法中写的顺序保持一致
public void readFields(DataInput dataInput) throws IOException {
//read string
this.name = dataInput.readUTF();
this.age = dataInput.readInt();
this.salary = dataInput.readInt();
}
}
复制代码
讲解内容··
数据倾斜是数据中的常见状况。数据中不可避免地会出现离群值(outlier),并致使数据倾斜。这些离群值会显著地拖慢MapReduce的执行。常见的数据倾斜有如下几类:
在map端和reduce端都有可能发生数据倾斜。在map端的数据倾斜会让多样化的数据集的处理效率更低。在reduce端的数据倾斜经常来源于MapReduce的默认分区器。
数据倾斜会致使map和reduce的任务执行时间大为延长,也会让须要缓存数据集的操做消耗更多的内存资源。
在reduce方法中加入记录map输出键的详细状况的功能
在发现了倾斜数据的存在以后,就颇有必要诊断形成数据倾斜的那些键。有一个简便方法就是在代码里实现追踪每一个键的最大值。为了减小追踪量,能够设置数据量阀值,只追踪那些数据量大于阀值的键,并输出到日志中。
Reduce数据倾斜通常是指map的输出数据中存在数据频率倾斜的情况,也就是部分输出键的数据量远远大于其它的输出键
如何减少reduce端数据倾斜的性能损失?
Hadoop默认的分区器是基于map输出键的哈希值分区。这仅在数据分布比较均匀时比较好。在有数据倾斜时就颇有问题。
使用分区器须要首先了解数据的特性。**TotalOrderPartitioner**中,能够经过对原始数据进行抽样获得的结果集来预设分区边界值。TotalOrderPartitioner中的范围分区器能够经过预设的分区边界值进行分区。所以它也能够很好地用在矫正数据中的部分键的数据倾斜问题。
复制代码
另外一个抽样和范围分区的替代方案是基于输出键的背景知识进行自定义分区。例如,若是map输出键的单词来源于一本书。其中大部分必然是省略词(stopword)。那么就能够将自定义分区将这部分省略词发送给固定的一部分reduce实例。而将其余的都发送给剩余的reduce实例。
复制代码
使用Combine能够大量地减少数据频率倾斜和数据大小倾斜。在可能的状况下,combine的目的就是聚合并精简数据。在技术48种介绍了combine。
复制代码
若是链接的数据集太大而不能在map端的链接中使用。那么能够考虑第4章和第7章中介绍的超大数据集的链接优化方案。
复制代码
在map端或reduce端的数据大小倾斜都会对缓存形成较大的影响,乃至致使OutOfMemoryError异常。处理这种状况并不容易。能够参考如下方法。
- 设置mapred.linerecordreader.maxlength来限制RecordReader读取的最大长度。RecordReader在TextInputFormat和KeyValueTextInputFormat类中使用。默认长度没有上限。
- 经过org.apache.hadoop.contrib.utils.join设置缓存的数据集的记录数上限。在reduce中默认的缓存记录数上限是100条。
- 考虑使用有损数据结构压缩数据,如Bloom过滤器。
复制代码
MR的没有分篇,篇幅很大,但愿你们可以耐心看完。
根据顺序下一篇是Yarn,走完大数据的这个流程。