以前的文章已经简单介绍过mapreduce的运做流程,不过其内部的shuffle过程并未深刻讲解;本篇博客将分享shuffle的全过程。java
1、mapreduce运做流程长卷图(其中[深]朱红色表明是能够用户自定义的部分,固然它们有默认实现)apache
2、shuffle过程当中的combiner自定义实现服务器
首先combiner组件有什么做用呢?它能够减小咱们在shuffle归并排序是的次数、reduce阶段处理的数据次数,同时能够有效提供程序的执行效率。app
如下是wordcount使用combiner实现的代码框架
(1) maper实现:ide
package com.empire.hadoop.mr.wccombinerdemo; import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; /** * WordcountMapper.java的实现描述: KEYIN: 默认状况下,是mr框架所读到的一行文本的起始偏移量,Long, * 可是在hadoop中有本身的更精简的序列化接口,因此不直接用Long,而用LongWritable * VALUEIN:默认状况下,是mr框架所读到的一行文本的内容,String,同上,用Text * KEYOUT:是用户自定义逻辑处理完成以后输出数据中的key,在此处是单词,String,同上,用Text * VALUEOUT:是用户自定义逻辑处理完成以后输出数据中的value,在此处是单词次数,Integer,同上,用IntWritable 类 * * @author arron 2018年12月4日 下午9:30:09 */ public class WordcountMapper extends Mapper<LongWritable, Text, Text, IntWritable> { /** * map阶段的业务逻辑就写在自定义的map()方法中 maptask会对每一行输入数据调用一次咱们自定义的map()方法 */ @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { //将maptask传给咱们的文本内容先转换成String String line = value.toString(); //根据空格将这一行切分红单词 String[] words = line.split(" "); //将单词输出为<单词,1> for (String word : words) { //将单词做为key,将次数1做为value,以便于后续的数据分发,能够根据单词分发,以便于相同单词会到相同的reduce task context.write(new Text(word), new IntWritable(1)); } } }
(2) reducer实现实现:oop
package com.empire.hadoop.mr.wccombinerdemo; import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; /** * 类 WordcountReducer.java的实现描述:KEYIN, VALUEIN 对应 mapper输出的KEYOUT,VALUEOUT类型对应 * KEYOUT, VALUEOUT 是自定义reduce逻辑处理结果的输出数据类型 KEYOUT是单词 VLAUEOUT是总次数 * * @author arron 2018年12月4日 下午9:51:15 */ public class WordcountReducer extends Reducer<Text, IntWritable, Text, IntWritable> { /** * <angelababy,1><angelababy,1><angelababy,1><angelababy,1><angelababy,1> * <hello,1><hello,1><hello,1><hello,1><hello,1><hello,1> * <banana,1><banana,1><banana,1><banana,1><banana,1><banana,1> * 入参key,是一组相同单词kv对的key */ @Override protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int count = 0; /* * Iterator<IntWritable> iterator = values.iterator(); * while(iterator.hasNext()){ count += iterator.next().get(); } */ for (IntWritable value : values) { count += value.get(); } context.write(key, new IntWritable(count)); } }
(3) combiner实现实现:大数据
package com.empire.hadoop.mr.wccombinerdemo; import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; /** * 类 WordcountCombiner.java的实现描述:输如为map的输出 * * @author arron 2018年12月4日 下午9:29:25 */ public class WordcountCombiner extends Reducer<Text, IntWritable, Text, IntWritable> { @Override protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int count = 0; for (IntWritable v : values) { count += v.get(); } context.write(key, new IntWritable(count)); } }
(4) mapreduce主程序驱动类实现:3d
package com.empire.hadoop.mr.wccombinerdemo; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.CombineTextInputFormat; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; /** * 类 WordcountDriver.java的实现描述:至关于一个yarn集群的客户端 须要在此封装咱们的mr程序的相关运行参数,指定jar包 * 最后提交给yarn * * @author arron 2018年12月4日 下午9:29:48 */ public class WordcountDriver { public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); //是否运行为本地模式,就是看这个参数值是否为local,默认就是local /* conf.set("mapreduce.framework.name", "local"); */ //本地模式运行mr程序时,输入输出的数据能够在本地,也能够在hdfs上 //到底在哪里,就看如下两行配置你用哪行,默认就是file:/// /* conf.set("fs.defaultFS", "hdfs://mini1:9000/"); */ /* conf.set("fs.defaultFS", "file:///"); */ //运行集群模式,就是把程序提交到yarn中去运行 //要想运行为集群模式,如下3个参数要指定为集群上的值 /* * conf.set("mapreduce.framework.name", "yarn"); * conf.set("yarn.resourcemanager.hostname", "mini1"); * conf.set("fs.defaultFS", "hdfs://mini1:9000/"); */ Job job = Job.getInstance(conf); job.setJar("c:/wc.jar"); //指定本程序的jar包所在的本地路径 /* job.setJarByClass(WordcountDriver.class); */ //指定本业务job要使用的mapper/Reducer业务类 job.setMapperClass(WordcountMapper.class); job.setReducerClass(WordcountReducer.class); //指定mapper输出数据的kv类型 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); //指定最终输出的数据的kv类型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); //指定须要使用combiner,以及用哪一个类做为combiner的逻辑 /* job.setCombinerClass(WordcountCombiner.class); */ job.setCombinerClass(WordcountReducer.class); //若是不设置InputFormat,它默认用的是TextInputformat.class job.setInputFormatClass(CombineTextInputFormat.class); CombineTextInputFormat.setMaxInputSplitSize(job, 4194304); CombineTextInputFormat.setMinInputSplitSize(job, 2097152); //指定job的输入原始文件所在目录 FileInputFormat.setInputPaths(job, new Path(args[0])); //指定job的输出结果所在目录 FileOutputFormat.setOutputPath(job, new Path(args[1])); //将job中配置的相关参数,以及job所用的java类所在的jar包,提交给yarn去运行 /* job.submit(); */ boolean res = job.waitForCompletion(true); System.exit(res ? 0 : 1); } }
3、最后总结code
虽然combiner组件在shuffle阶段使用的话,能够提升程序效率;可是,它有一个使用限制条件,那就是不能影响最后的执行结果;例如:这里讲述一个反例,对多个输入的数进行求平均数,若是此时使用combiner将不能获得正确的结果。
最后寄语,以上是博主本次文章的所有内容,若是你们以为博主的文章还不错,请点赞;若是您对博主其它服务器大数据技术或者博主本人感兴趣,请关注博主博客,而且欢迎随时跟博主沟通交流。