hadoop笔记八:Combiner优化MapReduce

1.介绍

当每次map执行以后都有大量中间结果输出,而后又reduce去进行合并计算的过程当中,都会用把中间数据从map所在的节点传输到reduce所在的节点进行计算的过程,这个过程当中就会有大量的IO读写和网络传输,从而下降计算的效率。java

这种状况下就须要咱们对map输出的中间结果,在本地先进行必要的合并计算减少中间结果的数据量以后再传输到reduce上面进行最终计算。apache

2.使用

从功能上面讲combiner其实就是一个reduce,只是计算的位置和数据不同,combiner是在数据所在的map节点上面计算的,且计算的数据只是当前map所输出的中间结果。编程

combiner是实现也是继承自Reducer。且在使用时对job进行设置网络

// 设置combiner类
job.setCombinerClass(AverageCombiner.class);

3.实例一

1)数据

tmpIn.txtapp

014399999999999/1992-01-31/10
014399999999999/1992-02-28/11
014399999999999/1992-03-31/14
014399999999999/1992-04-30/16
014399999999999/1992-05-51/30
014399999999999/1992-06-30/33
014399999999999/1992-07-31/35
014399999999999/1993-01-31/10
014399999999999/1993-02-28/14
014399999999999/1993-03-31/13
014399999999999/1993-04-30/25
014399999999999/1993-05-31/30
014399999999999/1993-06-30/36
014399999999999/1993-07-31/38
014399999999999/1994-01-31/10
014399999999999/1994-02-28/14
014399999999999/1994-03-31/13
014399999999999/1994-04-30/25
014399999999999/1994-05-31/30
014399999999999/1994-06-30/36

2)分析

计算每一年的平均温度,若是这个文件在多个map上面计算,咱们能够先对每一个map上面的数据进行计算,求出每一个map上面每一年的平均温度,而后再计算reduce对全部数据计算每一年平均温度。ide

3)编程

平均对象oop

package com.jf.obj;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.VIntWritable;
import org.apache.hadoop.io.Writable;

/**
 * 用于存放平均值 和数量
 * 
 * @author Administrator
 *
 */
public class AverageValue implements Writable {

	// 计算平均值的数量
	private VIntWritable num;
	// 平均值
	private DoubleWritable avgValue;

	public AverageValue() {
		num = new VIntWritable();
		avgValue = new DoubleWritable();
	}

	public void write(DataOutput out) throws IOException {
		num.write(out);
		avgValue.write(out);
	}

	public void readFields(DataInput in) throws IOException {
		num.readFields(in);
		avgValue.readFields(in);
	}

	public VIntWritable getNum() {
		return num;
	}

	public void setNum(VIntWritable num) {
		this.num = num;
	}

	public DoubleWritable getAvgValue() {
		return avgValue;
	}

	public void setAvgValue(DoubleWritable avgValue) {
		this.avgValue = avgValue;
	}

}

实现计算this

package com.jf.combiner;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.VIntWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

import com.jf.combiner.AverageTempCombiner.AverageCombiner.AverageReducer;
import com.jf.obj.AverageValue;

public class AverageTempCombiner extends Configured implements Tool {

	// map类
	static class AverageMapper extends Mapper<LongWritable, Text, Text, AverageValue> {

		@Override
		protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, AverageValue>.Context context)
				throws IOException, InterruptedException {
			// 每行数据格式:014399999999999/1992-01-31/10
			String line = value.toString();
			if (line != null && line.length() > 0) {
				String[] strs = line.split("/");
				if (strs.length == 3) {
					String time = strs[1];
					// 得到年份
					String year = time.substring(0, time.indexOf("-"));

					Double temp = Double.parseDouble(strs[2]);
					// 构建平均对象
					AverageValue averageValue = new AverageValue();
					averageValue.setNum(new VIntWritable(1));
					averageValue.setAvgValue(new DoubleWritable(temp));
					// 将年份和温度平均对象写出
					context.write(new Text(year), averageValue);
				}
			}
		}
	}

	// combiner,combiner本质上就是一个reducer因此继承自Reducer,只是combiner能够在map端首先进行初步汇总计算
	// combiner 汇总的只是本数据节点的map结果数据
	static class AverageCombiner extends Reducer<Text, AverageValue, Text, AverageValue> {
		@Override
		protected void reduce(Text key, Iterable<AverageValue> values,
				Reducer<Text, AverageValue, Text, AverageValue>.Context context)
				throws IOException, InterruptedException {
			int num = 0;
			double sumValue = 0;
			for (AverageValue averageValue : values) {
				num += averageValue.getNum().get();
				sumValue += averageValue.getAvgValue().get() * averageValue.getNum().get();
			}
			// 构建combiner以后的AverageValue对象,做为reducer的输入
			AverageValue avgValue = new AverageValue();
			avgValue.setNum(new VIntWritable(num));
			avgValue.setAvgValue(new DoubleWritable(sumValue / num));
			context.write(key, avgValue);
		}

		// 进行reducer计算
		static class AverageReducer extends Reducer<Text, AverageValue, Text, DoubleWritable> {
			@Override
			protected void reduce(Text key, Iterable<AverageValue> values,
					Reducer<Text, AverageValue, Text, DoubleWritable>.Context context)
					throws IOException, InterruptedException {
				int num = 0;
				double sumValue = 0;
				for (AverageValue averageValue : values) {
					num += averageValue.getNum().get();
					sumValue += averageValue.getAvgValue().get() * averageValue.getNum().get();
				}
				context.write(key, new DoubleWritable(sumValue / num));
			}
		}
	}

	public int run(String[] args) throws Exception {
		Configuration conf = getConf();
		Path input = new Path(conf.get("input"));
		Path output = new Path(conf.get("output"));

		Job job = Job.getInstance(conf, this.getClass().getSimpleName());
		job.setJarByClass(this.getClass());

		job.setMapperClass(AverageMapper.class);
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(AverageValue.class);

		job.setReducerClass(AverageReducer.class);
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(DoubleWritable.class);

		// 设置combiner类
		job.setCombinerClass(AverageCombiner.class);

		job.setInputFormatClass(TextInputFormat.class);
		job.setOutputFormatClass(TextOutputFormat.class);

		TextInputFormat.addInputPath(job, input);
		TextOutputFormat.setOutputPath(job, output);

		return job.waitForCompletion(true) ? 0 : 1;
	}

	public static void main(String[] args) throws Exception {
		System.exit(ToolRunner.run(new AverageTempCombiner(), args));
	}

}

4)结果

bin/yarn jar /home/softwares/my_hadoop-0.0.1-SNAPSHOT.jar com.jf.combiner.AverageTempCombiner -Dinput=input/tempIn -Doutput=output/tempCombiner

4.实例二

1)数据

同实例一数据。spa

2)分析

本次计算的是每个气象站每一年的平均温度,因此咱们要使用年份和睦象站同时做业联合的key(联合键)。日志

咱们就写一个YeayStation,对于YearStation既要序列化又要可比较大小要实现WritableComparable<T>。

咱们须要使用hash值是由于在数据分区的时候,也就是肯定哪一个数据进入哪一个reduce的时候。须要经过hashCode和reduce个数取余的结果肯定进入哪一个reduce。(IntWritable的默认hash值是它表明int类型数字的自己)因此说数据分区主要是用的HashCode(key的值得hashCode)。

须要比较大小是由于进入同一个reduce的多组数据谁先进入,要比较它key值得大小。谁小谁先进入

若是咱们不去重写HashCode的话,咱们使用的是Object的hashCode()方法。当咱们一个YearStation对象重复去使用的时候,全部的hashCode都同样。因此咱们仍是尽量的去重写hashCode和equals方法。咱们须要year和stationId同时参与分区,那咱们重写的hashcode同时和这两个参数有关系。

注意:在这个需求中,咱们须要重写toString()方法,由于咱们这个键最后要输出到HDFS中的结果文件中去的。若是不重写多是一个YearStation的地址。那么reduce输出的key和value以什么分割的?其实就是制表符("\t")。

3)编程

联合key

package com.jf.obj;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.VIntWritable;
import org.apache.hadoop.io.Writable;

/**
 * 用于存放平均值 和数量
 * 
 * @author Administrator
 *
 */
public class AverageValue implements Writable {

	// 计算平均值的数量
	private VIntWritable num;
	// 平均值
	private DoubleWritable avgValue;

	public AverageValue() {
		num = new VIntWritable();
		avgValue = new DoubleWritable();
	}

	public void write(DataOutput out) throws IOException {
		num.write(out);
		avgValue.write(out);
	}

	public void readFields(DataInput in) throws IOException {
		num.readFields(in);
		avgValue.readFields(in);
	}

	public VIntWritable getNum() {
		return num;
	}

	public void setNum(VIntWritable num) {
		this.num = num;
	}

	public DoubleWritable getAvgValue() {
		return avgValue;
	}

	public void setAvgValue(DoubleWritable avgValue) {
		this.avgValue = avgValue;
	}

}

combiner编程

package com.jf.combiner;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.VIntWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

import com.jf.obj.AverageValue;
import com.jf.obj.YearStation;

public class AvgByYearStationCombiner extends Configured implements Tool {

	static class AvgMapper extends Mapper<LongWritable, Text, YearStation, AverageValue> {
		@Override
		protected void map(LongWritable key, Text value,
				Mapper<LongWritable, Text, YearStation, AverageValue>.Context context)
				throws IOException, InterruptedException {
			String line = value.toString();
			if (line != null && line.length() > 0) {
				String[] strs = line.split("/");
				if (strs.length == 3) {
					String time = strs[1];
					String year = time.substring(0, time.indexOf("-"));
					// 年份和站点组成的联合key
					YearStation yearStation = new YearStation();
					yearStation.setYear(new Text(year));
					yearStation.setStation(new Text(strs[0]));
					// 平均对象
					AverageValue averageValue = new AverageValue();
					averageValue.setNum(new VIntWritable(1));
					averageValue.setAvgValue(new DoubleWritable(Double.parseDouble(strs[2])));
					System.out.println("combiner:" + yearStation + "==" + averageValue.getAvgValue().get() + "X"
							+ averageValue.getNum().get());
					context.write(yearStation, averageValue);
				}
			}
		}
	}

	static class AvgCombiner extends Reducer<YearStation, AverageValue, YearStation, AverageValue> {
		@Override
		protected void reduce(YearStation key, Iterable<AverageValue> values,
				Reducer<YearStation, AverageValue, YearStation, AverageValue>.Context context)
				throws IOException, InterruptedException {
			int num = 0;
			double sumValue = 0;
			for (AverageValue averageValue : values) {
				num += averageValue.getNum().get();
				sumValue += averageValue.getAvgValue().get() * averageValue.getNum().get();
			}
			// 建立中间平均对象
			AverageValue avgValue = new AverageValue();
			avgValue.setNum(new VIntWritable(num));
			avgValue.setAvgValue(new DoubleWritable(sumValue / num));
			System.out.println("combiner:" + key + "==" + avgValue.getAvgValue().get() + "X" + avgValue.getNum().get());
			context.write(key, avgValue);
		}
	}

	static class AvgReducer extends Reducer<YearStation, AverageValue, YearStation, DoubleWritable> {
		@Override
		protected void reduce(YearStation key, Iterable<AverageValue> values,
				Reducer<YearStation, AverageValue, YearStation, DoubleWritable>.Context context)
				throws IOException, InterruptedException {
			int num = 0;
			double sumValue = 0;
			for (AverageValue averageValue : values) {
				num += averageValue.getNum().get();
				sumValue += averageValue.getAvgValue().get() * averageValue.getNum().get();
			}
			System.out.println("combiner:" + key + "==" + sumValue / num);
			context.write(key, new DoubleWritable(sumValue / num));
		}
	}

	public int run(String[] args) throws Exception {
		Configuration conf = getConf();
		Path input = new Path(conf.get("input"));
		Path output = new Path(conf.get("output"));

		Job job = Job.getInstance(conf, this.getClass().getSimpleName());
		job.setJarByClass(this.getClass());

		job.setMapperClass(AvgMapper.class);
		job.setMapOutputKeyClass(YearStation.class);
		job.setMapOutputValueClass(AverageValue.class);

		job.setCombinerClass(AvgCombiner.class);

		job.setReducerClass(AvgReducer.class);
		job.setOutputKeyClass(YearStation.class);
		job.setOutputValueClass(DoubleWritable.class);

		job.setInputFormatClass(TextInputFormat.class);
		job.setOutputFormatClass(TextOutputFormat.class);

		TextInputFormat.addInputPath(job, input);
		TextOutputFormat.setOutputPath(job, output);

		return job.waitForCompletion(true) ? 0 : 1;
	}

	public static void main(String[] args) throws Exception {
		System.exit(ToolRunner.run(new AvgByYearStationCombiner(), args));
	}

}

4)结果

执行结果

日志中间处理

相关文章
相关标签/搜索