用一句简单的话语描述combiner组件做用:下降map任务输出,减小reduce任务数量,从而下降网络负载java
工做机制:apache
Map任务容许在提交给Reduce任务以前在本地执行一次汇总的操做,那就是combiner组件,combiner组件的行为模式和Reduce同样,都是接收key/values,产生key/value输出网络
注意:app
一、combiner的输出是reduce的输入ide
二、若是combiner是可插拔的 ,那么combiner毫不能改变最终结果oop
三、combiner是一个优化组件,可是并非全部地方都能用到,因此combiner只能用于reduce的输入、输出key/value类型彻底一致且不影响最终结果的场景。优化
例子:WordCount程序中,经过统计每个单词出现的次数,咱们能够首先经过Map任务本地进行一次汇总(Combiner),而后将汇总的结果交给Reduce,完成各个Map任务存在相同KEY的数据进行一次总的汇总,图:spa
Combiner代码:
code
Combiner类,直接打开Combiner类源码是直接继承Reducer类,因此咱们直接继承Reducer类便可,最终在提交时指定我们定义的Combiner类便可orm
package com.itheima.hadoop.mapreduce.combiner; import java.io.IOException; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; public class WordCountCombiner extends Reducer<Text, LongWritable, Text, LongWritable> { @Override protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException { long count = 0 ; for (LongWritable value : values) { count += value.get(); } context.write(key, new LongWritable(count)); } }
Mapper类:
package com.itheima.hadoop.mapreduce.mapper; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; public class WordCountCombinerMapper extends Mapper<LongWritable, Text, Text, LongWritable> { public void map(LongWritable key, Text value, Context context) throws java.io.IOException, InterruptedException { String line = value.toString(); //获取一行数据 String[] words = line.split(" "); //获取各个单词 for (String word : words) { // 将每个单词写出去 context.write(new Text(word), new LongWritable(1)); } } }
驱动类:
package com.itheima.hadoop.drivers; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.Tool; import com.itheima.hadoop.mapreduce.combiner.WordCountCombiner; import com.itheima.hadoop.mapreduce.mapper.WordCountCombinerMapper; public class WordCountCombinerDriver extends Configured implements Tool{ @Override public int run(String[] args) throws Exception { /** * 提交五重奏: * 一、产生做业 * 二、指定MAP/REDUCE * 三、指定MAPREDUCE输出数据类型 * 四、指定路径 * 五、提交做业 */ Configuration conf = new Configuration(); Job job = Job.getInstance(conf); job.setJarByClass(WordCountCombinerDriver.class); job.setMapperClass(WordCountCombinerMapper.class); /***此处中间小插曲:combiner组件***/ job.setCombinerClass(WordCountCombiner.class); /***此处中间小插曲:combiner组件***/ //reduce逻辑和combiner逻辑一致且combiner又是reduce的子类 job.setReducerClass(WordCountCombiner.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(LongWritable.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(LongWritable.class); FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); return job.waitForCompletion(true) ? 0 : 1; } }
主类:
package com.itheima.hadoop.runner; import org.apache.hadoop.util.ToolRunner; import com.itheima.hadoop.drivers.WordCountCombinerDriver; public class WordCountCombinerRunner { public static void main(String[] args) throws Exception { int res = ToolRunner.run(new WordCountCombinerDriver(), args); System.exit(res); } }
运行结果: