MapReduce是Hadoop生态系统的一个重要组成部分,与分布式文件系统HDFS、分布式数据库HBase一块儿合称为传统Hadoop的三驾马车,一块儿构成了一个面向海量数据的分布式系统的基础架构。java
MapReduce是一个用于大规模数据(大于1TB)处理的分布式计算模型、编程模型,它最初是由Google设计并实现的,在Google提出时,给它的定义是:Map/Reduce是一个编程模型(programming model),是一个用于处理和生成大规模数据集(processing and generating large data sets)的相关的实现。数据库
MapReduce的主要思想“Map(映射)”和“Reduce(规约)”都来自于函数式编程语言。MapReduce极大地方便了编程人员在不会分布式并行编程的状况下,将本身的程序运行在分布式系统之上。用户只须要定义一个map函数来处理一个key/value对以生成一批中间的key/value对,再定义一个reduce函数将全部这些中间的有着相同key的values合并起来。不少现实世界中的任务均可用这个模型来表达,具备较强的实用价值。编程
具体来看,MapReuce应当是包含了如下三层含义:服务器
(1)MapReduce是一个基于集群的高性能并行计算平台。经过MapReduce能够将市场上普通的商用服务器构成一个包含数10、数百甚至数千个节点的分布和并行计算集群。架构
(2)MapReduce是一个并行计算与运行软件框架。它提供了一个庞大但设计精良的并行计算软件框架,能自动完成计算任务的并行化处理,自动划分计算数据和计算任务,在集群节点上自动分配和执行任务以及收集计算结果,将数据分布存储、数据通讯、容错处理等并行计算涉及到的不少系统底层的复杂细节交由系统负责处理,大大减小了软件开发人员的负担。app
(3)MapReduce是一个并行程序设计模型与方法。它借助于函数式程序设计语言Lisp的设计思想,提供了一种简便的并行程序设计方法,用Map和Reduce两个函数编程实现基本的并行计算任务,提供了抽象的操做和并行编程接口,以简单方便地完成大规模数据的编程和计算处理。框架
MapReduce的数据处理模型很是简单:map函数和reduce函数的输入和输出都遵循<key,value>键值对的格式,简单的用符号表示就是:编程语言
Map:(K1,V1)——> list(K2,V2)分布式
Reduce:(K2,list< V2>)——> list<K3,V3>函数式编程
Map-Reduce框架的运做彻底基于<key,value>对,即数据的输入是一批<key,value>对,生成的结果也是一批<key,value>对,只是有时候它们的类型不同而已。Key和value的类因为须要支持被序列化(serialize)操做,因此它们必需要实现Writable接口,并且key的类还必须实现WritableComparable接口,使得可让框架对数据集的执行排序操做,后面咱们经过具体的实例来展现它的用法。
MapReduce是Hadoop生态系统的一员,是一个彻底开源的分布式计算系统。MapReduce从第一次提出到今天,并非一成不变的,虽然其主流思想和计算模型没有大的改变,可是整个系统也是在不断的完善和演变的。
首先经典版本的MapReduce框架,也就是初版成熟的商用框架,属于Hadoop的V1.0版本,这个版本的主要特色是简单易用,其思路也比较清晰,各个Client提交Job给一个统一的Job Tracker,而后Job Tracker将Job拆分红N个Task,而后进行分发到各个节点(Node)进行并行协同运行,而后再将各自的运行结果反馈至Job Tracker,进而输出结果。
<div align=center>  </div>
虽然实现简单,可是这个1.0版本存在着其固有的局限性,其中最主要的一点就是:单点故障问题。全部的Job的完成都得益于JobTracker的调度和分配,一旦此节点宕机就意味着整个平台的瘫痪,固然,在实际中大部分经过一个备用机来解决。可是,在一个以分布式运算为特性的框架中,将这种核心的计算集中与一台机器不是一个最优的方案。其次,这个设计扩展性不强,容易形成资源的浪费。
所以,为了减轻单个JobTracker的职责,mapreduce的2.0版本开始引入了YARN做为集群的资源管理器,JobTracker的职责分为两大部分:集群资源管理和任务协调,YARN做为资源管理器,专一于负责整个平台的资源管理,而任务的调度和协调交给下属的任务节点来完成。其主要的运行机制后面具体解析。
目前为止,Hadoop已经发展到了3.0版本,3.0和2.0版本在编程模型和运行机制上没有太大的变化,仍然使用YARN做为其资源管理器,可是在稳定性、存储开销和兼容性等方面有所优化。
下面经过实例来对MapReduce的过程进行说明。
WordCount是Hadoop自带的一个例子,目标是统计文本文件中单词的个数。假设有以下的两个文本文件来运行WorkCount程序:
第一个文件内容:Hello World Bye World
第二个文件内容:Hello Hadoop GoodBye Hadoop
(1)Map数据的输入
MapReduce针对文本文件缺省使用LineRecordReader类来实现读取,一行一个key/value对,key取偏移量,value为行内容。所以,对于给出的文件,假设每一个文件正好是一个分片,那么会有两个Map任务,MapReduce会将其映射为以下所示的键值对做为Map过程的输入。
<div align=center>
Map任务 | key | value |
---|---|---|
map1 | 0(偏移量) | Hello World Bye World |
map2 | 0(偏移量) | Hello Hadoop GoodBye Hadoop |
</div> (2)Map的输出
用户经过定义map函数对输入的键值对进行处理,目标是统计每一个单词的个数,这至关于一个数据预处理的过程,通过处理后,会输出一系列的键值对,键是每一个单词,而值是个数(也就是1)。
<div align=center>
Map任务 | key | value |
---|---|---|
map1 | Hello | 1 |
map1 | World | 1 |
map1 | Bye | 1 |
map1 | World | 1 |
map2 | Hello | 1 |
map2 | Hadoop | 1 |
map2 | GoodBye | 1 |
map2 | Hadoop | 1 |
</div>   这里须要注意,MapReduce还有一个能够在此时用的功能是Combiner,它能够将相同key的值合并起来,它也用Reducer的实现。可是,有些状况,Combiner并不适用,只有中间结果合并以后不会影响最终结果的才可使用Combiner,如这里计算单词次数,这就可使用Comniner,使用后,map1任务中的World会合并为1个,个数成为2,一样map2任务的Hadoop也合并为1个,个数成为2。
(3)Reduce的输入
Map或者Combiner的输出(若是有的话)会经历一个shuffle的过程,这个过程将key相同的数据进行合并,并按照字符顺序进行排序。
如这里Combiner输出进行shuffle以后会获得:
<div align=center>
Key | Value |
---|---|
Bye | [1] |
GoodBye | [1] |
Hadoop | [2] |
Hello | [1,1] |
World | [2] |
</div> (4)Reduce的输出
最后,Reducer实现将相同key的值合并起来,获得最后的结果。
<div align=center>
Key | Value |
---|---|
Bye | 1 |
GoodBye | 1 |
Hadoop | 2 |
Hello | 2 |
World | 2 |
</div>   用图表示以下: <div align=center>  </div> <div align=center>  </div>
这就是一个MapReduce应用的实例,其编程实现也很是简单,用户只须要定义map函数和reduce函数,而后写一个驱动程序来运行做业便可。
public static class Map extends Mapper<LongWritable, Text, Text, IntWritable> { private final static IntWritable one = new IntWritable(1); private Text word = new Text(); //输出的key的类型,能够理解为String public void map(LongWritable key, Text value, Context context) { String line = value.toString(); //每行句子 StringTokenizer tokenizer = new StringTokenizer(line); while (tokenizer.hasMoreTokens()) { word.set(tokenizer.nextToken()); context.write(word, one); //输出 } } } public static class Reduce extends Reducer<Text, IntWritable, Text, IntWritable> { //在这里,reduce步的输入至关于<单词,valuelist>,如<Hello,<1,1>> public void reduce(Text key, Iterable<IntWritable> values,Context context) { int sum = 0; for (IntWritable val : values) sum += val.get(); context.write(key, new IntWritable(sum)); } }
以上就是map函数和reduce函数的实现,逻辑都很简单,最后咱们只须要写一个主函数,设置一个Job做业,进行相对设置,就能够运行,好比以下的Job做业设置了处理该做业的类、做业名字、输入输出数据的路径、map和reduce对应的类、输出结果类型,最后调用执行命令进行执行便可。
Job job = new Job(); // 建立一个做业对象 job.setJarByClass(WordCount.class); // 设置运行/处理该做业的类 job.setJobName("WordCount"); FileInputFormat.addInputPath(job, new Path(args[0])); //设置这个做业输入数据的路径 FileOutputFormat.setOutputPath(job, new Path(args[1])); //设置这个做业输出结果的路径 job.setMapperClass(Map.class); //设置实现了Map步的类 job.setReducerClass(Reduce.class); //设置实现了Reduce步的类 job.setOutputKeyClass(Text.class); //设置输出结果key的类型 job.setOutputValueClass(IntWritable.class); //设置输出结果value的类型 System.exit(job.waitForCompletion(true) ? 0 : 1); //执行做业