MapReduce是Google的一项重要技术,它首先是一个编程模型,用以进行大数据量的计算。对于大数据量的计算,一般采用的处理手法就是并行计算。但对许多开发者来讲,本身完彻底全实现一个并行计算程序难度太大,而MapReduce就是一种简化并行计算的编程模型,它使得那些没有多有多少并行计算经验的开发人员也能够开发并行应用程序。这也就是MapReduce的价值所在,经过简化编程模型,下降了开发并行应用的入门门槛。html
Hadoop MapReduce是一个软件框架,基于该框架可以容易地编写应用程序,这些应用程序可以运行在由上千个商用机器组成的大集群上,并以一种可靠的,具备容错能力的方式并行地处理上TB级别的海量数据集。这个定义里面有着这些关键词,一是软件框架,二是并行处理,三是可靠且容错,四是大规模集群,五是海量数据集。java
所以,对于MapReduce,能够简洁地认为,它是一个软件框架,海量数据是它的“菜”,它在大规模集群上以一种可靠且容错的方式并行地“烹饪这道菜”。程序员
简单地讲,MapReduce能够作大数据处理。所谓大数据处理,即以价值为导向,对大数据加工、挖掘和优化等各类处理。apache
MapReduce擅长处理大数据,它为何具备这种能力呢?这可由MapReduce的设计思想发觉。MapReduce的思想就是“分而治之”。编程
(1)Mapper负责“分”,即把复杂的任务分解为若干个“简单的任务”来处理。“简单的任务”包含三层含义:一是数据或计算的规模相对原任务要大大缩小;二是就近计算原则,即任务会分配到存放着所需数据的节点上进行计算;三是这些小任务能够并行计算,彼此间几乎没有依赖关系。网络
(2)Reducer负责对map阶段的结果进行汇总。至于须要多少个Reducer,用户能够根据具体问题,经过在mapred-site.xml配置文件里设置参数mapred.reduce.tasks的值,缺省值为1。app
一个比较形象的语言解释MapReduce: 框架
We want to count all the books in the library. You count up shelf #1, I count up shelf #2. That’s map. The more people we get, the faster it goes.分布式
咱们要数图书馆中的全部书。你数1号书架,我数2号书架。这就是“Map”。咱们人越多,数书就更快。ide
Now we get together and add our individual counts. That’s reduce.
如今咱们到一块儿,把全部人的统计数加在一块儿。这就是“Reduce”。
MapReduce的整个工做过程如上图所示,它包含以下4个独立的实体:
实体一:客户端,用来提交MapReduce做业。
实体二:JobTracker,用来协调做业的运行。
实体三:TaskTracker,用来处理做业划分后的任务。
实体四:HDFS,用来在其它实体间共享做业文件。
经过审阅MapReduce的工做流程图,能够看出MapReduce整个工做过程有序地包含以下工做环节:
在Hadoop中,一个MapReduce做业一般会把输入的数据集切分为若干独立的数据块,由Map任务以彻底并行的方式去处理它们。框架会对Map的输出先进行排序,而后把结果输入给Reduce任务。一般做业的输入和输出都会被存储在文件系统中,整个框架负责任务的调度和监控,以及从新执行已经关闭的任务。
一般,MapReduce框架和分布式文件系统是运行在一组相同的节点上,也就是说,计算节点和存储节点一般都是在一块儿的。这种配置容许框架在那些已经存好数据的节点上高效地调度任务,这可使得整个集群的网络带宽被很是高效地利用。
(1)JobTracker
JobTracker负责调度构成一个做业的全部任务,这些任务分布在不一样的TaskTracker上(由上图的JobTracker能够看到2 assign map 和 3 assign reduce)。你能够将其理解为公司的项目经理,项目经理接受项目需求,并划分具体的任务给下面的开发工程师。
(2)TaskTracker
TaskTracker负责执行由JobTracker指派的任务,这里咱们就能够将其理解为开发工程师,完成项目经理安排的开发任务便可。
MapReduce框架运转在<key,value>键值对上,也就是说,框架把做业的输入当作是一组<key,value>键值对,一样也产生一组<key,value>键值对做为做业的输出,这两组键值对有多是不一样的。
一个MapReduce做业的输入和输出类型以下图所示:能够看出在整个流程中,会有三组<key,value>键值对类型的存在。
这里以WordCount单词计数为例,介绍map和reduce两个阶段须要进行哪些处理。单词计数主要完成的功能是:统计一系列文本文件中每一个单词出现的次数,如图所示:
(1)map任务处理
(2)reduce任务处理
WordCount单词计数是最简单也是最能体现MapReduce思想的程序之一,该程序完整的代码能够在Hadoop安装包的src/examples目录下找到。
WordCount单词计数主要完成的功能是:统计一系列文本文件中每一个单词出现的次数;
首先在Linux中经过Vim编辑一个简单的words.txt,其内容很简单以下所示:
Hello Edison Chou
Hello Hadoop RPC
Hello Wncud Chou
Hello Hadoop MapReduce
Hello Dick Gu
经过Shell命令将其上传到一个指定目录中,这里指定为:/testdir/input
在Hadoop 中, map 函数位于内置类org.apache.hadoop.mapreduce.Mapper<KEYIN,VALUEIN, KEYOUT, VALUEOUT>中,reduce 函数位于内置类org.apache.hadoop. mapreduce.Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT>中。
咱们要作的就是覆盖map 函数和reduce 函数,首先咱们来覆盖map函数:继承Mapper类并重写map方法
/** * @author Edison Chou * @version 1.0 * @param KEYIN * →k1 表示每一行的起始位置(偏移量offset) * @param VALUEIN * →v1 表示每一行的文本内容 * @param KEYOUT * →k2 表示每一行中的每一个单词 * @param VALUEOUT * →v2 表示每一行中的每一个单词的出现次数,固定值为1 */ public static class MyMapper extends Mapper<LongWritable, Text, Text, LongWritable> { protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, LongWritable>.Context context) throws java.io.IOException, InterruptedException { String[] spilted = value.toString().split(" "); for (String word : spilted) { context.write(new Text(word), new LongWritable(1L)); } }; }
Mapper 类,有四个泛型,分别是KEYIN、VALUEIN、KEYOUT、VALUEOUT,前面两个KEYIN、VALUEIN 指的是map 函数输入的参数key、value 的类型;后面两个KEYOUT、VALUEOUT 指的是map 函数输出的key、value 的类型;
从代码中能够看出,在Mapper类和Reducer类中都使用了Hadoop自带的基本数据类型,例如String对应Text,long对应LongWritable,int对应IntWritable。这是由于HDFS涉及到序列化的问题,Hadoop的基本数据类型都实现了一个Writable接口,而实现了这个接口的类型都支持序列化。
这里的map函数中经过空格符号来分割文本内容,并对其进行记录;
如今咱们来覆盖reduce函数:继承Reducer类并重写reduce方法
/** * @author Edison Chou * @version 1.0 * @param KEYIN * →k2 表示每一行中的每一个单词 * @param VALUEIN * →v2 表示每一行中的每一个单词的出现次数,固定值为1 * @param KEYOUT * →k3 表示每一行中的每一个单词 * @param VALUEOUT * →v3 表示每一行中的每一个单词的出现次数之和 */ public static class MyReducer extends Reducer<Text, LongWritable, Text, LongWritable> { protected void reduce(Text key, java.lang.Iterable<LongWritable> values, Reducer<Text, LongWritable, Text, LongWritable>.Context context) throws java.io.IOException, InterruptedException { long count = 0L; for (LongWritable value : values) { count += value.get(); } context.write(key, new LongWritable(count)); }; }
Reducer 类,也有四个泛型,同理,分别指的是reduce 函数输入的key、value类型(这里输入的key、value类型一般和map的输出key、value类型保持一致)和输出的key、value 类型。
这里的reduce函数主要是将传入的<k2,v2>进行最后的合并统计,造成最后的统计结果。
(1)设定输入目录,固然也能够做为参数传入
public static final String INPUT_PATH = "hdfs://hadoop-master:9000/testdir/input/words.txt";
(2)设定输出目录(输出目录须要是空目录),固然也能够做为参数传入
public static final String OUTPUT_PATH = "hdfs://hadoop-master:9000/testdir/output/wordcount";
(3)Main函数的主要代码
public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); // 0.0:首先删除输出路径的已有生成文件 FileSystem fs = FileSystem.get(new URI(INPUT_PATH), conf); Path outPath = new Path(OUTPUT_PATH); if (fs.exists(outPath)) { fs.delete(outPath, true); } Job job = new Job(conf, "WordCount"); job.setJarByClass(MyWordCountJob.class); // 1.0:指定输入目录 FileInputFormat.setInputPaths(job, new Path(INPUT_PATH)); // 1.1:指定对输入数据进行格式化处理的类(能够省略) job.setInputFormatClass(TextInputFormat.class); // 1.2:指定自定义的Mapper类 job.setMapperClass(MyMapper.class); // 1.3:指定map输出的<K,V>类型(若是<k3,v3>的类型与<k2,v2>的类型一致则能够省略) job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(LongWritable.class); // 1.4:分区(能够省略) job.setPartitionerClass(HashPartitioner.class); // 1.5:设置要运行的Reducer的数量(能够省略) job.setNumReduceTasks(1); // 1.6:指定自定义的Reducer类 job.setReducerClass(MyReducer.class); // 1.7:指定reduce输出的<K,V>类型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(LongWritable.class); // 1.8:指定输出目录 FileOutputFormat.setOutputPath(job, new Path(OUTPUT_PATH)); // 1.9:指定对输出数据进行格式化处理的类(能够省略) job.setOutputFormatClass(TextOutputFormat.class); // 2.0:提交做业 boolean success = job.waitForCompletion(true); if (success) { System.out.println("Success"); System.exit(0); } else { System.out.println("Failed"); System.exit(1); } }
在Main函数中,主要作了三件事:一是指定输入、输出目录;二是指定自定义的Mapper类和Reducer类;三是提交做业;匆匆看下来,代码有点多,但有些实际上是能够省略的。
(4)完整代码以下所示
package mapreduce; import java.io.FileInputStream; import java.io.IOException; import java.net.URI; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner; public class MyWordCountJob { /** * @author Edison Chou * @version 1.0 * @param KEYIN * →k1 表示每一行的起始位置(偏移量offset) * @param VALUEIN * →v1 表示每一行的文本内容 * @param KEYOUT * →k2 表示每一行中的每一个单词 * @param VALUEOUT * →v2 表示每一行中的每一个单词的出现次数,固定值为1 */ public static class MyMapper extends Mapper<LongWritable, Text, Text, LongWritable> { protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, LongWritable>.Context context) throws java.io.IOException, InterruptedException { String[] spilted = value.toString().split(" "); for (String word : spilted) { context.write(new Text(word), new LongWritable(1L)); } }; } /** * @author Edison Chou * @version 1.0 * @param KEYIN * →k2 表示每一行中的每一个单词 * @param VALUEIN * →v2 表示每一行中的每一个单词的出现次数,固定值为1 * @param KEYOUT * →k3 表示每一行中的每一个单词 * @param VALUEOUT * →v3 表示每一行中的每一个单词的出现次数之和 */ public static class MyReducer extends Reducer<Text, LongWritable, Text, LongWritable> { protected void reduce(Text key, java.lang.Iterable<LongWritable> values, Reducer<Text, LongWritable, Text, LongWritable>.Context context) throws java.io.IOException, InterruptedException { long count = 0L; for (LongWritable value : values) { count += value.get(); } context.write(key, new LongWritable(count)); }; } // 输入文件路径 public static final String INPUT_PATH = "hdfs://hadoop-master:9000/testdir/input/words.txt"; // 输出文件路径 public static final String OUTPUT_PATH = "hdfs://hadoop-master:9000/testdir/output/wordcount"; public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); // 0.0:首先删除输出路径的已有生成文件 FileSystem fs = FileSystem.get(new URI(INPUT_PATH), conf); Path outPath = new Path(OUTPUT_PATH); if (fs.exists(outPath)) { fs.delete(outPath, true); } Job job = new Job(conf, "WordCount"); job.setJarByClass(MyWordCountJob.class); // 1.0:指定输入目录 FileInputFormat.setInputPaths(job, new Path(INPUT_PATH)); // 1.1:指定对输入数据进行格式化处理的类(能够省略) job.setInputFormatClass(TextInputFormat.class); // 1.2:指定自定义的Mapper类 job.setMapperClass(MyMapper.class); // 1.3:指定map输出的<K,V>类型(若是<k3,v3>的类型与<k2,v2>的类型一致则能够省略) job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(LongWritable.class); // 1.4:分区(能够省略) job.setPartitionerClass(HashPartitioner.class); // 1.5:设置要运行的Reducer的数量(能够省略) job.setNumReduceTasks(1); // 1.6:指定自定义的Reducer类 job.setReducerClass(MyReducer.class); // 1.7:指定reduce输出的<K,V>类型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(LongWritable.class); // 1.8:指定输出目录 FileOutputFormat.setOutputPath(job, new Path(OUTPUT_PATH)); // 1.9:指定对输出数据进行格式化处理的类(能够省略) job.setOutputFormatClass(TextOutputFormat.class); // 2.0:提交做业 boolean success = job.waitForCompletion(true); if (success) { System.out.println("Success"); System.exit(0); } else { System.out.println("Failed"); System.exit(1); } } }
(1)调试查看控制台状态信息
(2)经过Shell命令查看统计结果
Hadoop有个ToolRunner类,它是个好东西,简单好用。不管在《Hadoop权威指南》仍是Hadoop项目源码自带的example,都推荐使用ToolRunner。
下面咱们看下src/example目录下WordCount.java文件,它的代码结构是这样的:
public class WordCount { // 略... public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); // 略... Job job = new Job(conf, "word count"); // 略... System.exit(job.waitForCompletion(true) ? 0 : 1); } }
WordCount.java中使用到了GenericOptionsParser这个类,它的做用是将命令行中参数自动设置到变量conf中。举个例子,好比我但愿经过命令行设置reduce task数量,就这么写:
bin/hadoop jar MyJob.jar com.xxx.MyJobDriver -Dmapred.reduce.tasks=5
上面这样就能够了,不须要将其硬编码到java代码中,很轻松就能够将参数与代码分离开。
至此,咱们尚未说到ToolRunner,上面的代码咱们使用了GenericOptionsParser帮咱们解析命令行参数,编写ToolRunner的程序员更懒,它将 GenericOptionsParser调用隐藏到自身run方法,被自动执行了,修改后的代码变成了这样:
public class WordCount extends Configured implements Tool { @Override public int run(String[] arg0) throws Exception { Job job = new Job(getConf(), "word count"); // 略... System.exit(job.waitForCompletion(true) ? 0 : 1); return 0; } public static void main(String[] args) throws Exception { int res = ToolRunner.run(new Configuration(), new WordCount(), args); System.exit(res); } }
看看这段代码上有什么不一样:
(1)让WordCount继承Configured并实现Tool接口。
(2)重写Tool接口的run方法,run方法不是static类型,这很好。
(3)在WordCount中咱们将经过getConf()获取Configuration对象。
能够看出,经过简单的几步,就能够实现代码与配置隔离、上传文件到DistributeCache等功能。修改MapReduce参数不须要修改java代码、打包、部署,提升工做效率。
public class MyJob extends Configured implements Tool { public static class MyMapper extends Mapper<LongWritable, Text, Text, LongWritable> { protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, LongWritable>.Context context) throws java.io.IOException, InterruptedException { ...... } }; } public static class MyReducer extends Reducer<Text, LongWritable, Text, LongWritable> { protected void reduce(Text key, java.lang.Iterable<LongWritable> values, Reducer<Text, LongWritable, Text, LongWritable>.Context context) throws java.io.IOException, InterruptedException { ...... }; } // 输入文件路径 public static final String INPUT_PATH = "hdfs://hadoop-master:9000/testdir/input/words.txt"; // 输出文件路径 public static final String OUTPUT_PATH = "hdfs://hadoop-master:9000/testdir/output/wordcount"; @Override public int run(String[] args) throws Exception { // 首先删除输出路径的已有生成文件 FileSystem fs = FileSystem.get(new URI(INPUT_PATH), getConf()); Path outPath = new Path(OUTPUT_PATH); if (fs.exists(outPath)) { fs.delete(outPath, true); } Job job = new Job(getConf(), "WordCount"); // 设置输入目录 FileInputFormat.setInputPaths(job, new Path(INPUT_PATH)); // 设置自定义Mapper job.setMapperClass(MyMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(LongWritable.class); // 设置自定义Reducer job.setReducerClass(MyReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(LongWritable.class); // 设置输出目录 FileOutputFormat.setOutputPath(job, new Path(OUTPUT_PATH)); System.exit(job.waitForCompletion(true) ? 0 : 1); return 0; } public static void main(String[] args) { Configuration conf = new Configuration(); try { int res = ToolRunner.run(conf, new MyJob(), args); System.exit(res); } catch (Exception e) { e.printStackTrace(); } } }
(1)王路情,《Hadoop之MapReduce》:http://blog.csdn.net/wangloveall/article/details/21407531
(2)Suddenly,《Hadoop日记之MapReduce》:http://www.cnblogs.com/sunddenly/p/3985386.html
(3)伯乐在线,《我是如何向老婆解释MapReduce的》:http://blog.jobbole.com/1321/
(4)codingwu,《MapReduce原理与设计思想》:http://www.cnblogs.com/archimedes/p/mapreduce-principle.html
(5)codingwu,《MapReduce实例浅析》:http://www.cnblogs.com/archimedes/p/mapreduce-example-analysis.html
(6)挑灯看剑,《图解MapReduce原理和执行过程》:http://blog.csdn.net/michael_kong_nju/article/details/23826979
(7)万川梅、谢正兰,《Hadoop应用开发实战详解(修订版)》:http://item.jd.com/11508248.html
(8)张月,《Hadoop MapReduce开发最佳实践》:http://www.infoq.com/cn/articles/MapReduce-Best-Practice-1