本文是对mapreduce技术的一个初步学习的总结,包括以下章节的内容:css
参考资料:java
一、本文介绍的内容依赖hadoop环境,关于hadoop运行环境的搭建可参见《Hadoop运行环境搭建》。程序员
二、mapreduce的编程模型设计受到了函数式编程中的的map和reduce原语的启发,为了有助于更好的理解mapreduce的编程模型,可先阅读《函数式编程之集合操做》。sql
大数据的应用,有两个最核心的任务要处理,一是海量数据的存储,二是对海量数据的分析和处理。hadoop分别提供了分布式文件系统hdfs和分布式计算框架mapreduce来解决。其中mapreduce是来解决海量数据的分析和处理的。数据库
可是咱们在实际的开发中,不多会去编写mapreduce代码来进行大数据的分析处理。咱们更多的是听到的是诸如hive, spark,storm这些技术,更多的大数据学习者也是学习这些技术。这是为何呢?apache
这主要是有两个缘由,一是mapreduce是一种适合离线数据分析的技术,其效率上比较低,不能知足一些低时延需求的数据分析业务,低时延的数据处理每每采用诸如spark,storm这些技术。编程
其次,使用mapreduce须要根据业务场景来设计map和reduce的处理逻辑,编写java代码,并提交到集群上执行,这属于比较底层的操做,对程序员的要求较高。而hive等技术借鉴了关系数据库的特色,提供你们很熟悉的类sql机制,可让程序员以较低门槛的方式来处理大数据。swift
那为何咱们还要来学习mapreduce呢?首先它是大数据处理的最先解决方案或者说是鼻祖,并且是hive等技术的基础(Hive是将类sql语句最终转换成mapreduce程序来处理),学习它,有助于加深对hive等技术的使用。其次数据处理的思路是相同的,了解mapreduce的机制和原理,对熟悉其它大数据分析处理技术(如spark,storm,impala等)也是有帮助的。最后,虽然如今直接编写mapreduce程序不多了,但在某些应用场景下,编写mapreduce程序就是很好的解决方案。综上所说,做为一个大数据技术的学习者,是很是有必要来学习mapreduce技术的。api
mapreduce是跟随hadoop一块儿推出的,分为第一代(称为 MapReduce 1.0或者MRv1,对应hadoop第1代)和第二代(称为MapReduce 2.0或者MRv2,对应hadoop第2代)。app
第一代MapReduce计算框架,它由两部分组成:编程模型(programming model)和运行时环境(runtime environment)。它的基本编程模型是将问题抽象成Map和Reduce两个阶段,其中Map阶段将输入数据解析成key/value,迭代调用map()函数处理后,再以key/value的形式输出到本地目录,而Reduce阶段则将key相同的value进行规约处理,并输出最终结果。它的运行时环境由两类服务组成:JobTracker和TaskTracker,其中,JobTracker负责资源管理和全部做业的控制,而TaskTracker负责接收来自JobTracker的命令并执行它。
MapReduce 2.0或者MRv2具备与MRv1相同的编程模型,惟一不一样的是运行时环境。MRv2是在MRv1基础上经加工以后,运行于资源管理框架YARN之上的MRv1,它再也不由JobTracker和TaskTracker组成,而是变为一个做业控制进程ApplicationMaster,且ApplicationMaster仅负责一个做业的管理,至于资源的管理,则由YARN完成。
总结下,MRv1是一个独立的离线计算框架,而MRv2则是运行于YARN之上的MRv1。
做为MapReduce 程序的开发人员,尤为是初学者,咱们在了解其原理的基础上,重要的是学会如何使用框架提供的api去编写代码。MapReduce 的api分为新旧两套,新旧api位于不一样的java包中。其中旧的api位于org.apache.hadoop.mapred包(子包)中,新的api位于org.apache.hadoop.mapreduce包(子包)中。本文介绍的例子使用的都是新的api。
咱们编写mapreduce程序(后面用mr来简称mapreduce)是用来进行数据处理的,每次数据的处理咱们称为一个mr做业(或任务)。一个mr任务的处理过程分为两个阶段:map阶段 和 reduce阶段。
每一个阶段都以键值对(key-value对)做为输入和输出,其数据类型是由程序员来选择的,即在代码中设置的。其程序执行的基本的过程以下:
一、mr框架读取待处理的数据(通常来自HDFS文件),生成map阶段全部的key-value数据集合,交由map阶段处理。
二、map阶段处理上面的key-value数据,生成新的key-value数据集合。map阶段的处理的核心就是由框架调用一个程序员编写的map函数来处理。每一个输入的键值对都会调用一次map函数来处理,map函数的输出结果也是key-value键值对。
三、框架对全部map阶段的输出数据进行排序和分组(这过程称为shuffle),生成新的key-value数据集合,交由reduce处理。
四、reduce阶段会对数据进行操做,最后也是生成key-value数据,这也是mr任务最终的输出结果。reduce阶段的处理的核心就是框架调用一个程序员编写的reduce函数来处理。每一个输入的键值对都会调用一次reduce函数来处理,reduce函数的输出结果也是key-value键值对,就是最终的结果。
咱们下面经过一个具体的例子来进一步理解mr的运行机制。该例子是,从文件中统计单词重复出现的次数。假设输入的文件中的内容以下:
mary jack
this is jack he is mary
对于上述待处理文件,mr框架会读取文件中的内容,生成给map处理的key-value集合,生成的数据内容以下(注意下面只是示意,不是实际的存储格式):
(1, mary jack) (10,this is jack) (22,he is mary)
对于文本文件,在默认状况下,mr框架生成的key-value的key是每行的首字符在文件中的位置,value是每行的文本,如上面的数据。
对于上面的每对key-value数据,会交给map处理,本例子是为了获取单词重复的次数,首先须要将单词区分出来,显然,map阶段能够用来干这个事,这样咱们map阶段能够有这样的输出(咱们这里先直接给出结果,后面会有具体的代码):
(mary,1) (jack,1) (this,1) (is,1) (jack,1) (he,1) (is,1) (mary,1)
也就是map阶段输出的key-value对是每一个单词,其中key是单词自己,value是固定值为1。
map处理后,框架会对Map输出的Key-value数据基于key来对数据进行排序和分组(这过程称为shuffle),结果数据以下:
(mary,[1,1]) (jack,[1,1]) (this,[1]) (is,[1,1]) (he,[1])
能够看出,shuffle操做的结果是,将map输出数据中相同Key的value进行合并成列表,生成新的key-value数据,传给reduce。
这样,reduce要作的事情就很简单了,就是将每对key-value数据中的value中的各个元素的值汇总便可。输出结果如:
he 1 is 2 jack 2 mary 2 this 1
上面就是整个Mr程序最终的输出结果。
Mr程序是用来计算海量数据的,提交一次Mr任务到集群上,通常会由多个map来同时处理(每一个map位于一个节点上)。
框架会将待处理的数据分红1个或多个“输入分片(split)”,每一个map只处理一个分片,每一个分片被划分为若干条记录,每条记录就是一个键/值对,map就是一个接一个的处理这些键/值对,也就是说,对于每一个键/值对,Map函数都会被执行一次,这个分片中有多少个键/值对,该map类中的map函数就会被调用多少次。
reduce任务的数量不是由输入数据的大小决定的,而是由程序员在代码中指定的,默认是1。若是是1,则map全部的输出数据都由该reduce节点来处理。若是是多个,则由框架将全部map的输出数据依据必定规则分为各个部分交给各个reduce分别处理。
下图是一个mr任务的数据流程图,能够比较清晰的展现上面描述的过程。
(摘自Hadoop权威指南一书)
编写一个简单的mr程序,通常至少须要编写3个类,分别是:
一、Mapper类的一个继承类,用于实现map函数;
二、Reducer类的实现类,用于实现reduce函数;
三、程序入口类(带main方法的),用于编写mr做业运行的一些代码。
下面咱们以上一节提到的统计单词重复次数的例子来介绍如何编写Mr程序代码。hadoop版本中也自带了这个例子的代码,具体位置位于hadoop安装目录下的
share\hadoop\mapreduce\sources\hadoop-mapreduce-examples-2.7.6-sources.jar文件中。
第一,首先编写Mapper类的继承类,重写map函数,类的完整代码以下:
package com.mrexample.wordcount; import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; public class CountMap extends Mapper<LongWritable, Text, Text, IntWritable> { private final static IntWritable intValue = new IntWritable(1); private Text keyword = new Text(); protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String[] values = value.toString().split(" "); for (String item : values) { if(item.isEmpty()){ continue; } keyword.set(item); context.write(keyword, intValue); } } }
Mapper类是一个泛型类型,它有4个形参类型,须要程序员来指定,这4个类型按顺序分别是 输入给map函数的键值对数据的key的类型和value的类型,以及map输出键值对数据的key的类型和value的类型。由于对于单词统计这个例子,map输入的key为数值(对应mr中的类型为LongWritable,相似java中的long类型),value为字符串(对应mr中的类型为Text,相似java中的String类型),map输出的key类型是字符串,value类型是数值。
Mapper类的map方法(函数)有3个参数,前2个参数对应map输入键值对数据的key的类型和value的类型,即和Mapper类的前两个泛型参数一致。第3个参数是Context 类型,用于写入map函数处理后要输出的结果。
map方法的处理逻辑很简单,输入的key不关心,把输入的value(即每行数据)进行字符串split操做得到字符串中的各个单词,而后经过Context 类的write方法将结果输出。
第二,而后编写Reducer类的继承类,重写reduce函数,类的完整代码以下:
package com.mrexample.wordcount; import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; public class CountReduce extends Reducer<Text, IntWritable, Text, IntWritable>{ private IntWritable wordNum = new IntWritable(); @Override protected void reduce(Text key, Iterable<IntWritable> value, Context context) throws IOException, InterruptedException { int sum = 0; for (IntWritable val : value) { sum += val.get(); } wordNum.set(sum); context.write(key, wordNum); } }
Reducer类也是一个泛型类,同Mapper类相似,也有4个参数类型,用于指定输入和输出类型。须要注意的是,Reducer类的输入类型必须匹配Mapper类的输出类型,这个很好理解,由于Map的输出就是reduce的输入。
reduce方法(函数)也有3个参数,第1个参数是输入键值对的键,第2个参数是一个迭代器,对应map输出后由框架进行shuffle操做后的值的集合,第3个参数Contex用于写输出结果的。
reduce方法的逻辑也比较简单,由于咱们要统计单词的重复个数,因此就对第2个参数进行遍历,算出总数便可。而后按照key-value的方式经过Context参数输出。
第三,有了map和reduce代码,还须要编写一个java入口类,用于完成Mr任务的相关设置,完整代码以下:
package com.mrexample.wordcount; import java.io.IOException; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class CountMain { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { Job job = Job.getInstance(); //指定本job所在的jar包 job.setJarByClass(CountMain.class); //设置本job所用的mapper逻辑类和reducer逻辑类 job.setMapperClass(CountMap.class); job.setReducerClass(CountReduce.class); //设置最终输出的kv数据类型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); //设置输入和输出文件的路径 FileInputFormat.setInputPaths(job, new Path("实际输入路径或文件")); FileOutputFormat.setOutputPath(job, new Path("实际输出路径")); //提交job给hadoop集群,等待做业完成后main方法才会结束 job.waitForCompletion(true); } }
上面建立的类是一个普通的带main方法的java类,在main方法中,对本Mr任务进行相关设置。上面代码中的设置是最小设置,不少设置采用的是默认值。下面对上面代码进行一一的解释。
首先经过Job.getInstance()建立一个Job对象,该对象用于进行做业信息的设置,用于控制整个做业的运行。
要想做业在集群中运行,须要把代码打包成一个Jar包文件(mr框架会在整个集群上发布整个Jar文件),咱们须要在代码中经过setJarByClass方法传递一个类,这样mr框架就能根据这个类来查找到相关的jar文件。
而后调用setMapperClass和setReducerClass方法指定本做业执行所须要的Mapper类和Reducer类。
还须要调用setOutputKeyClass和setOutputValueClass指定做业最终(即reduce操做)输出的key-value键值对的数据类型,这个要与Reducer实现类代码中指定的Reducer类中的泛型参数保持一致。须要注意的是,若是map操做的输出类型与最终的输出类型不一致,则须要显示的单独设置map的输出类型,可调用Job类的setMapOutputKeyClass和setMapOutputValueClass方法进行设置,由于咱们这个例子中map的输出类型和reduce的输出类型一致,因此不用单独再设置map的输出类型。
在咱们这个例子,任务的输入来自文件,输出也是写入文件。因此须要设置输入路径和输出路径,输入路径经过调用FileInputFormat类的静态方法setInputPaths来设置,能够是一个文件名,也能够是一个目录,若是是一个目录,则该目录下的全部文件都会被做为输入文件处理;输出路径经过调用FileOutputFormat类的静态方法setOutputPath来设置,须要注意的是,mr框架要求在做业运行前该输出目录是不存在的,若是存在,程序会报错。
最后调用waitForCompletion方法来提交做业,参数传入true表示等待做业完成方法才返回,这样整个做业完成后main方法才会结束。
写好mr程序后,正常状况下咱们是要把代码打成jar包,而后提交到hadoop集群环境下去运行。但若是咱们每次都在集群环境下去验证代码的正确性,就比较复杂,一来集群环境准备比较麻烦,二来执行比较耗时,三来调试、查找问题比较麻烦。所以,咱们最好先能在本地进行验证,先保证代码逻辑是正确的。
好在mr程序能够在本地执行,咱们能够在本地准备一个小型数据进行测试,以验证代码是否有问题。当确保没有代码的问题后,咱们再拿到集群上去验证性能等问题。
要想mr程序在本地运行,咱们须要设置Mr程序不使用hdfs文件系统上的文件(而使用本地文件),同时不使用yarn进行资源调度。咱们须要在代码里进行参数的设置,如:
Configuration conf= **new** Configuration(); conf.set("fs.defaultFS","file:///"); conf.set("mapreduce.framework.name","local"); Job job = Job.getInstance(conf);
上面的代码是前面例子中的代码,"fs.defaultFS"参数表明使用哪一个文件系统,这里设置值为"file:///"表示使用本地文件系统;"mapreduce.framework.name"参数表明执行Mr程序的方式,这里设置值为"local"表示使用本地的方式,不使用yarn。若是不在代码中进行设置,这些参数的值是取的当前环境下的hadoop配置文件中设置的值,具体可参考《Hadoop运行环境搭建》中的配置文件设置介绍。这样咱们须要建立一个Configuration 对象,进行相关参数设置后,并传给建立Job对象的getInstance静态方法。
另外须要注意的是,须要把前面章节中例子代码中的输入、输出路径改成实际的本地路径。
通常状况下,咱们会在IDE工具中(如eclipse,intellij)中进行代码的开发,为了编译经过,须要引入所依赖的相应的jar包,这有两种方式,一是利用maven的pom文件自动引入,二是直接在IDE中显示的设置。要想编译没问题,只须要引入hadoop-common-2.7.6.jar(位于安装目录的share\hadoop\common目录下)和hadoop-mapreduce-client-core-2.7.6.jar(位于安装目录的share\hadoop\mapreduce目录下)。
上面引入的两个Jar包只能让编译经过。但若是要执行mr程序,须要依赖更多Jar包。最简单的运行方式是,将Mr程序编译后的class打成jar包,而后利用hadoop jar命令来执行,该命令会自动引入执行Mr程序须要的jar包。
假设上面的wordcount例子的代码已经打成Jar包,jar包名为wordcount.jar。进入命令行界面,当前目录为wordcount.jar所在的目录,而后执行:
hadoop jar wordcount.jar com.mrexample.wordcount.CountMain
会有不少信息在控制台上输出,若是执行成功,咱们打开上面代码中设置的输出目录,会发现生成了不少文件。其中结果位于一个或多个part-r-xxxxx文件中,其中xxxxx是一串数字编号,从00000开始。打开part-r-xxxxx文件,能够检查输出结果是否与预期一致,从而判断代码逻辑是否正确。
经过上面的统计单词重复次数的例子,咱们能够看出,编写mr程序的关键是根据需求,依照mr模型的要求,设计出相应的map函数和reduce函数。咱们再来看一个更简单的例子,加深下理解。
假设有不少文本文件,文件中的各行文本数据有重复的,咱们须要将这些文件中重复的行(包括不一样文件中的重复行)去除掉。这个若是采用mr来实现,就很是简单了。
首先咱们考虑map函数怎么写?由于对于文本文件,mr框架默认处理后传给map的key-value键值对的key是行首字母在文件中的位置,value是该行的文本。因此咱们的map只需将行文本做为Key输出,对应的vaule没有做用,能够是一个空串。代码如:
public class RemoveMap extends Mapper<LongWritable, Text, Text, Text> { private Text tag = new Text(); protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { context.write(value, tag); } }
这样map输出的key-value数据通过mr框架shuffle操做后,输出的数据的key就是不重复的行数据了(即没有重复的行了)。这样咱们的reduce函数只需将传入的weikey输出便可,代码如:
public class RemoveReduce extends Reducer<Text, Text, Text, Text>{ private Text tag = new Text(); protected void reduce(Text key, Iterable<Text> value, Context context) throws IOException, InterruptedException { context.write(key, tag); } }
能够看出,mr程序实际上就是将输入转为key-value格式的数据流,分别通过map函数和reduce函数处理后,最后输出key-value格式的数据。这点与函数式编程的中的高阶函数map和reduce的概念很是相似,map是将一个数据集合转换为另外一个数据集合,reduce是对一个数据集合进行聚合等相应的操做。
前面的例子,mr处理的数据来自文本文件,最后生成的结果也到文本文件中。这时是Mr框架采用默认的方式来读取数据和写入数据的。在实际的场景中,咱们的数据可能不是来自于文件,输出也不必定写入文件中。或者即便是文件,也多是二进制的,不是文本文件。
其实MR能够处理不少不一样类型的数据格式。这个咱们在后续的文章中再介绍。