当每次map执行以后都有大量中间结果输出,而后又reduce去进行合并计算的过程当中,都会用把中间数据从map所在的节点传输到reduce所在的节点进行计算的过程,这个过程当中就会有大量的IO读写和网络传输,从而下降计算的效率。java
这种状况下就须要咱们对map输出的中间结果,在本地先进行必要的合并计算减少中间结果的数据量以后再传输到reduce上面进行最终计算。apache
从功能上面讲combiner其实就是一个reduce,只是计算的位置和数据不同,combiner是在数据所在的map节点上面计算的,且计算的数据只是当前map所输出的中间结果。编程
combiner是实现也是继承自Reducer。且在使用时对job进行设置网络
// 设置combiner类 job.setCombinerClass(AverageCombiner.class);
tmpIn.txtapp
014399999999999/1992-01-31/10 014399999999999/1992-02-28/11 014399999999999/1992-03-31/14 014399999999999/1992-04-30/16 014399999999999/1992-05-51/30 014399999999999/1992-06-30/33 014399999999999/1992-07-31/35 014399999999999/1993-01-31/10 014399999999999/1993-02-28/14 014399999999999/1993-03-31/13 014399999999999/1993-04-30/25 014399999999999/1993-05-31/30 014399999999999/1993-06-30/36 014399999999999/1993-07-31/38 014399999999999/1994-01-31/10 014399999999999/1994-02-28/14 014399999999999/1994-03-31/13 014399999999999/1994-04-30/25 014399999999999/1994-05-31/30 014399999999999/1994-06-30/36
计算每一年的平均温度,若是这个文件在多个map上面计算,咱们能够先对每一个map上面的数据进行计算,求出每一个map上面每一年的平均温度,而后再计算reduce对全部数据计算每一年平均温度。ide
平均对象oop
package com.jf.obj; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import org.apache.hadoop.io.DoubleWritable; import org.apache.hadoop.io.VIntWritable; import org.apache.hadoop.io.Writable; /** * 用于存放平均值 和数量 * * @author Administrator * */ public class AverageValue implements Writable { // 计算平均值的数量 private VIntWritable num; // 平均值 private DoubleWritable avgValue; public AverageValue() { num = new VIntWritable(); avgValue = new DoubleWritable(); } public void write(DataOutput out) throws IOException { num.write(out); avgValue.write(out); } public void readFields(DataInput in) throws IOException { num.readFields(in); avgValue.readFields(in); } public VIntWritable getNum() { return num; } public void setNum(VIntWritable num) { this.num = num; } public DoubleWritable getAvgValue() { return avgValue; } public void setAvgValue(DoubleWritable avgValue) { this.avgValue = avgValue; } }
实现计算this
package com.jf.combiner; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.DoubleWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.VIntWritable; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import com.jf.combiner.AverageTempCombiner.AverageCombiner.AverageReducer; import com.jf.obj.AverageValue; public class AverageTempCombiner extends Configured implements Tool { // map类 static class AverageMapper extends Mapper<LongWritable, Text, Text, AverageValue> { @Override protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, AverageValue>.Context context) throws IOException, InterruptedException { // 每行数据格式:014399999999999/1992-01-31/10 String line = value.toString(); if (line != null && line.length() > 0) { String[] strs = line.split("/"); if (strs.length == 3) { String time = strs[1]; // 得到年份 String year = time.substring(0, time.indexOf("-")); Double temp = Double.parseDouble(strs[2]); // 构建平均对象 AverageValue averageValue = new AverageValue(); averageValue.setNum(new VIntWritable(1)); averageValue.setAvgValue(new DoubleWritable(temp)); // 将年份和温度平均对象写出 context.write(new Text(year), averageValue); } } } } // combiner,combiner本质上就是一个reducer因此继承自Reducer,只是combiner能够在map端首先进行初步汇总计算 // combiner 汇总的只是本数据节点的map结果数据 static class AverageCombiner extends Reducer<Text, AverageValue, Text, AverageValue> { @Override protected void reduce(Text key, Iterable<AverageValue> values, Reducer<Text, AverageValue, Text, AverageValue>.Context context) throws IOException, InterruptedException { int num = 0; double sumValue = 0; for (AverageValue averageValue : values) { num += averageValue.getNum().get(); sumValue += averageValue.getAvgValue().get() * averageValue.getNum().get(); } // 构建combiner以后的AverageValue对象,做为reducer的输入 AverageValue avgValue = new AverageValue(); avgValue.setNum(new VIntWritable(num)); avgValue.setAvgValue(new DoubleWritable(sumValue / num)); context.write(key, avgValue); } // 进行reducer计算 static class AverageReducer extends Reducer<Text, AverageValue, Text, DoubleWritable> { @Override protected void reduce(Text key, Iterable<AverageValue> values, Reducer<Text, AverageValue, Text, DoubleWritable>.Context context) throws IOException, InterruptedException { int num = 0; double sumValue = 0; for (AverageValue averageValue : values) { num += averageValue.getNum().get(); sumValue += averageValue.getAvgValue().get() * averageValue.getNum().get(); } context.write(key, new DoubleWritable(sumValue / num)); } } } public int run(String[] args) throws Exception { Configuration conf = getConf(); Path input = new Path(conf.get("input")); Path output = new Path(conf.get("output")); Job job = Job.getInstance(conf, this.getClass().getSimpleName()); job.setJarByClass(this.getClass()); job.setMapperClass(AverageMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(AverageValue.class); job.setReducerClass(AverageReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(DoubleWritable.class); // 设置combiner类 job.setCombinerClass(AverageCombiner.class); job.setInputFormatClass(TextInputFormat.class); job.setOutputFormatClass(TextOutputFormat.class); TextInputFormat.addInputPath(job, input); TextOutputFormat.setOutputPath(job, output); return job.waitForCompletion(true) ? 0 : 1; } public static void main(String[] args) throws Exception { System.exit(ToolRunner.run(new AverageTempCombiner(), args)); } }
bin/yarn jar /home/softwares/my_hadoop-0.0.1-SNAPSHOT.jar com.jf.combiner.AverageTempCombiner -Dinput=input/tempIn -Doutput=output/tempCombiner
同实例一数据。spa
本次计算的是每个气象站每一年的平均温度,因此咱们要使用年份和睦象站同时做业联合的key(联合键)。日志
咱们就写一个YeayStation,对于YearStation既要序列化又要可比较大小要实现WritableComparable<T>。
咱们须要使用hash值是由于在数据分区的时候,也就是肯定哪一个数据进入哪一个reduce的时候。须要经过hashCode和reduce个数取余的结果肯定进入哪一个reduce。(IntWritable的默认hash值是它表明int类型数字的自己)因此说数据分区主要是用的HashCode(key的值得hashCode)。
须要比较大小是由于进入同一个reduce的多组数据谁先进入,要比较它key值得大小。谁小谁先进入。
若是咱们不去重写HashCode的话,咱们使用的是Object的hashCode()方法。当咱们一个YearStation对象重复去使用的时候,全部的hashCode都同样。因此咱们仍是尽量的去重写hashCode和equals方法。咱们须要year和stationId同时参与分区,那咱们重写的hashcode同时和这两个参数有关系。
注意:在这个需求中,咱们须要重写toString()方法,由于咱们这个键最后要输出到HDFS中的结果文件中去的。若是不重写多是一个YearStation的地址。那么reduce输出的key和value以什么分割的?其实就是制表符("\t")。
联合key
package com.jf.obj; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import org.apache.hadoop.io.DoubleWritable; import org.apache.hadoop.io.VIntWritable; import org.apache.hadoop.io.Writable; /** * 用于存放平均值 和数量 * * @author Administrator * */ public class AverageValue implements Writable { // 计算平均值的数量 private VIntWritable num; // 平均值 private DoubleWritable avgValue; public AverageValue() { num = new VIntWritable(); avgValue = new DoubleWritable(); } public void write(DataOutput out) throws IOException { num.write(out); avgValue.write(out); } public void readFields(DataInput in) throws IOException { num.readFields(in); avgValue.readFields(in); } public VIntWritable getNum() { return num; } public void setNum(VIntWritable num) { this.num = num; } public DoubleWritable getAvgValue() { return avgValue; } public void setAvgValue(DoubleWritable avgValue) { this.avgValue = avgValue; } }
combiner编程
package com.jf.combiner; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.DoubleWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.VIntWritable; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import com.jf.obj.AverageValue; import com.jf.obj.YearStation; public class AvgByYearStationCombiner extends Configured implements Tool { static class AvgMapper extends Mapper<LongWritable, Text, YearStation, AverageValue> { @Override protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, YearStation, AverageValue>.Context context) throws IOException, InterruptedException { String line = value.toString(); if (line != null && line.length() > 0) { String[] strs = line.split("/"); if (strs.length == 3) { String time = strs[1]; String year = time.substring(0, time.indexOf("-")); // 年份和站点组成的联合key YearStation yearStation = new YearStation(); yearStation.setYear(new Text(year)); yearStation.setStation(new Text(strs[0])); // 平均对象 AverageValue averageValue = new AverageValue(); averageValue.setNum(new VIntWritable(1)); averageValue.setAvgValue(new DoubleWritable(Double.parseDouble(strs[2]))); System.out.println("combiner:" + yearStation + "==" + averageValue.getAvgValue().get() + "X" + averageValue.getNum().get()); context.write(yearStation, averageValue); } } } } static class AvgCombiner extends Reducer<YearStation, AverageValue, YearStation, AverageValue> { @Override protected void reduce(YearStation key, Iterable<AverageValue> values, Reducer<YearStation, AverageValue, YearStation, AverageValue>.Context context) throws IOException, InterruptedException { int num = 0; double sumValue = 0; for (AverageValue averageValue : values) { num += averageValue.getNum().get(); sumValue += averageValue.getAvgValue().get() * averageValue.getNum().get(); } // 建立中间平均对象 AverageValue avgValue = new AverageValue(); avgValue.setNum(new VIntWritable(num)); avgValue.setAvgValue(new DoubleWritable(sumValue / num)); System.out.println("combiner:" + key + "==" + avgValue.getAvgValue().get() + "X" + avgValue.getNum().get()); context.write(key, avgValue); } } static class AvgReducer extends Reducer<YearStation, AverageValue, YearStation, DoubleWritable> { @Override protected void reduce(YearStation key, Iterable<AverageValue> values, Reducer<YearStation, AverageValue, YearStation, DoubleWritable>.Context context) throws IOException, InterruptedException { int num = 0; double sumValue = 0; for (AverageValue averageValue : values) { num += averageValue.getNum().get(); sumValue += averageValue.getAvgValue().get() * averageValue.getNum().get(); } System.out.println("combiner:" + key + "==" + sumValue / num); context.write(key, new DoubleWritable(sumValue / num)); } } public int run(String[] args) throws Exception { Configuration conf = getConf(); Path input = new Path(conf.get("input")); Path output = new Path(conf.get("output")); Job job = Job.getInstance(conf, this.getClass().getSimpleName()); job.setJarByClass(this.getClass()); job.setMapperClass(AvgMapper.class); job.setMapOutputKeyClass(YearStation.class); job.setMapOutputValueClass(AverageValue.class); job.setCombinerClass(AvgCombiner.class); job.setReducerClass(AvgReducer.class); job.setOutputKeyClass(YearStation.class); job.setOutputValueClass(DoubleWritable.class); job.setInputFormatClass(TextInputFormat.class); job.setOutputFormatClass(TextOutputFormat.class); TextInputFormat.addInputPath(job, input); TextOutputFormat.setOutputPath(job, output); return job.waitForCompletion(true) ? 0 : 1; } public static void main(String[] args) throws Exception { System.exit(ToolRunner.run(new AvgByYearStationCombiner(), args)); } }
执行结果
日志中间处理