原文连接:pengtuo.tech/大数据研发/2018/…java
本篇文章将会介绍 Hadoop
重要的计算框架 MapReduce
。算法
完整的 MapReduce
框架包含两部分:数据库
map
、shuffle
以及 reduce
三个重要算法组成部分,本篇文章将会介绍这个层面;MapReduce version2
之后,做业都是提交给 YARN
进行管理,因此本文将不会介绍此部分。系列其余文章有:apache
MapReduce
是一个基于 java 的并行分布式计算框架,使用它来编写的数据处理应用能够运行在大型的商用硬件集群上来处理大型数据集中的可并行化问题,数据处理能够发生在存储在文件系统(非结构化)或数据库(结构化)中的数据上。MapReduce
能够利用数据的位置,在存储的位置附近处理数据,以最大限度地减小通讯开销。编程
MapReduce
框架经过编组分布式服务器,并行运行各类任务,管理系统各部分之间的全部通讯和数据传输;其还能自动完成计算任务的并行化处理,自动划分计算数据和计算任务,在集群节点上自动分配和执行任务以及收集计算结果,将数据分布存储、数据通讯、容错处理等并行计算涉及到的不少系统底层的复杂细节交由系统负责处理,减小开发人员的负担。bash
MapReduce
仍是一个并行程序设计模型与方法(Programming Model & Methodology)。它借助于函数式程序设计语言Lisp的设计思想,提供了一种简便的并行程序设计方法,将复杂的、运行于大规模集群上的并行计算过程高度地抽象到了两个函数:Map和Reduce,用Map和Reduce两个函数编程实现基本的并行计算任务,提供了抽象的操做和并行编程接口,以简单方便地完成大规模数据的编程和计算处理。服务器
MapReduce框架一般由三个操做(或步骤)组成:网络
Map
:每一个工做节点将 map
函数应用于本地数据,并将输出写入临时存储。主节点确保仅处理冗余输入数据的一个副本。Shuffle
:工做节点根据输出键(由 map
函数生成)从新分配数据,对数据映射排序、分组、拷贝,目的是属于一个键的全部数据都位于同一个工做节点上。Reduce
:工做节点如今并行处理每一个键的每组输出数据。MapReduce 流程图: 数据结构
MapReduce
容许分布式运行 Map
操做,只要每一个 Map
操做独立于其余 Map
操做就能够并行执行。app
另外一种更详细的,将 MapReduce
分为5个步骤的理解是:
MapReduce
框架先指定 Map
处理器,而后给其分配将要处理的输入数据 -- 键值对 K1
,并为该处理器提供与该键值相关的全部输入数据;Map()
在 K1
键值对上运行一次,生成由 K2
指定的键值对的输出;K2
键值对,根据『键』是否相同移至相同的工做节点;K2
键值对进行 Reduce()
操做;MapReduce
框架收集全部 Reduce
输出,并按 K2
对其进行排序以产生最终结果进行输出。实际生产环境中,数据颇有多是分散在各个服务器上,对于原先的大数据处理方法,则是将数据发送至代码所在的地方进行处理,这样很是低效且占用了大量的带宽,为应对这种状况,MapReduce
框架的处理方法是,将 Map()
操做或者 Reduce()
发送至数据所在的服务器上,以『移动计算替代移动数据』,来加速整个框架的运行速度,大多数计算都发生在具备本地磁盘上数据的节点上,从而减小了网络流量。
一个 Map
函数就是对一些独立元素组成的概念上的列表的每个元素进行指定的操做,因此每一个元素都是被独立操做的,而原始列表没有被更改,由于这里建立了一个新的列表来保存新的答案。这就是说,Map
操做是能够高度并行的
MapReduce
框架的 Map
和 Reduce
函数都是根据 (key, value)
形式的数据结构定义的。 Map
在一个数据域(Data Domain)中获取一个键值对,而后返回一个键值对的列表:
Map(k1,v1) → list(k2,v2)
复制代码
Map
函数会被并行调用,应用于输入数据集中的每一个键值对(keyed by K1)。而后每一个调用返回一个键值对(keyed by K2)列表。以后,MapReduce
框架从全部列表中收集具备相同 key
(这里是 k2)的全部键值对,并将它们组合在一块儿,为每一个 key
建立一个组。
而 Reduce
是对一个列表的元素进行适当的合并。虽然不如 Map
函数那么并行,可是由于化简老是有一个简单的答案,大规模的运算相对独立,因此化简函数在高度并行环境下也颇有用。Reduce
函数并行应用于每一个组,从而在同一个数据域中生成一组值:
Reduce(k2, list (v2)) → list(v3)
复制代码
Reduce
端接收到不一样任务传来的有序数据组。此时 Reduce()
会根据程序猿编写的代码逻辑进行相应的 reduce
操做,例如根据同一个键值对进行计数加和等。若是Reduce
端接受的数据量至关小,则直接存储在内存中,若是数据量超过了该缓冲区大小的必定比例,则对数据合并后溢写到磁盘中。
前面提到过,Map
阶段有一个分割成组的操做,这个划分数据的过程就是 Partition
,而负责分区的 java 类就是 Partitioner
。
Partitioner
组件可让 Map
对 Key
进行分区,从而将不一样分区的 Key
交由不一样的 Reduce
处理,由此,Partitioner
数量等同于 Reducer
的数量,一个 Partitioner
对应一个 Reduce
做业,可认为其就是 Reduce
的输入分片,可根据实际业务状况编程控制,提升 Reduce
效率或进行负载均衡。MapReduce
的内置分区是HashPartition
。
具备多个分割老是有好处的,由于与处理整个输入所花费的时间相比,处理分割所花费的时间很短。当分割较小时,能够更好的处理负载平衡,可是分割也不宜过小,若是太小,则会使得管理拆分和任务加载的时间在总运行时间中占太高的比重。
下图是 map
任务和 reduce
任务的示意图:
这里给出一个统计词频案例的 Java 代码:
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class WordCount {
// 继承 Mapper 类,实现本身的 map 功能
public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable>{
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
// map 功能必须实现的函数
public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
StringTokenizer itr = new StringTokenizer(value.toString());
while (itr.hasMoreTokens()) {
word.set(itr.nextToken());
context.write(word, one);
}
}
}
// 继承 Reducer 类,实现本身的 reduce 功能
public static class IntSumReducer extends Reducer<Text,IntWritable,Text,IntWritable> {
private IntWritable result = new IntWritable();
public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
result.set(sum);
context.write(key, result);
}
}
public static void main(String[] args) throws Exception {
// 初始化Configuration,读取mapreduce系统配置信息
Configuration conf = new Configuration();
// 构建 Job 而且加载计算程序 WordCount.class
Job job = Job.getInstance(conf, "word count");
job.setJarByClass(WordCount.class);
//指定 Mapper、Combiner、Reducer,也就是咱们本身继承实现的类
job.setMapperClass(TokenizerMapper.class);
job.setCombinerClass(IntSumReducer.class);
job.setReducerClass(IntSumReducer.class);
// 设置输入输出数据
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
复制代码
上述代码会发如今指定 Mapper
以及 Reducer
时,还指定了 Combiner
类,Combiner
是一个本地化的 reduce
操做(所以咱们看见 WordCount
类里是用 reduce
进行加载的),它是 map
运算的后续操做,与 map
在同一个主机上进行,主要是在 map
计算出中间文件前作一个简单的合并重复key值的操做,减小中间文件的大小,这样在后续进行到 Shuffle
时,能够下降网络传输成本,提升网络传输效率。
提交 MR
做业的命令:
hadoop jar {程序的 jar 包} {任务名称} {数据输入路径} {数据输出路径}
复制代码
例如:
hadoop jar hadoop-mapreduce-wordcount.jar WordCount /sample/input /sample/output
复制代码
上述代码示意图:
Map -> Shuffle -> Reduce 的中间结果,包括最后的输出都是存储在本地磁盘上。
MapReduce
的两大优点是:
1 ) 并行处理:
在 MapReduce
中,咱们将做业划分为多个节点,每一个节点同时处理做业的一部分。所以,MapReduce
基于Divide and Conquer范例,它帮助咱们使用不一样的机器处理数据。因为数据由多台机器而不是单台机器并行处理,所以处理数据所需的时间会减小不少。
2 ) 数据位置:
咱们将计算移动到 MapReduce
框架中的数据,而不是将数据移动到计算部分。数据分布在多个节点中,其中每一个节点处理驻留在其上的数据部分。
这使得具备如下优点:
可是,MapReduce 也有其限制:
wordcount
功能就须要不少的设置和代码量,而 Spark
将会很是简单。