hadoop笔记六:MapReduce基础

1.概念

MapReduce是一种编程模型,用于大规模数据集(大于1TB)的并行运算,用于解决海量数据的计算问题。java

MapReduce分两部分组成web

①映射(Mapping):对集合里面的每个目标进行相同的操做,好比你要将一个表单里面的每一个单元格作乘以2的操做,那么你就能够将乘以2这个方法应用到表单里面的每一个单元格上面。apache

②化简(Reducing):遍历集合中的元素来返回一个综合的结果。即,输出表单里一列数字的和这个任务属于reducing。编程

执行过程:你向MapReduce框架提交一个计算做业时,它会首先把计算做业拆分红若干个Map任务,而后分配到不一样的节点上去执行,每个Map任务处理输入数据中的一部分,当Map任务完成后,它会生成一些中间文件,这些中间文件将会做为Reduce任务的输入数据。Reduce任务的主要目标就是把前面若干个Map的输出汇总到一块儿并输出。app

2.执行过程

①Map函数:(k1 : v1) -->[(k2 : v2)]框架

→输入:键值对(k1 : v1)表示的数据。ide

→处理:文档数据记录(如文本文件中的一行)以键值对的形式传入map函数,处理完成以后以另外一种键值对的形式输出处理结果[(k2 : v2)]。函数

→输出:键值对[(k2 : v2)]表示的一组中间数据。oop

②Reduce函数:(k2 : [v2]) --> [(k3 : k4)]测试

→输入:map输出的一组键值对[(k2 : v2)]将被进行合并处理将一样主键下的不一样值合并到一个列表[v2]中,故reduce的输入为(k2 : [v2])。

→处理:对传入的中间结果列表数据进行某种整理或进一步处理,并输出最终的某种的键值对形式的输出结果[(k3 : k4)]。

→输出:键值对[(k3 : k4)]表示最终数据。

注意:各个map函数对所划分的数据进行并行处理,从不一样的输入数据产生不一样的输出数据。进行reduce处理以前必须等到全部的map函数作完。最终汇总全部的reduce的输出结果便可得到最终结果。

3.举例:统计姓名

1)统计下面文件中姓名出现的次数

1,sean

2,bob

3,sean

4,bob

5,jf

从上面的数据分析出,咱们须要的是一行数据中的后一个数据,在map函数中,输入端v1表明的是一行数据,输出端的k2能够表明是被统计的姓名。在reduce函数中,k2仍是被统计的姓名,而[v2]是一个数据集,这里是将k2相同的键的v2数据合并起来。输出的是本身须要的数据k3表明的是统计的姓名,v3是姓名出现的次数。

代码实现:

解析文件数据

package com.jf.mapreduce;

import org.apache.hadoop.io.Text;

public class NameRecordParser {

	private String nameId;
	private String name;
	private boolean valid;

	// 解析每行数据
	public void parse(String line) {
		String[] strs = line.split(",");
		if (strs.length == 2) {
			nameId = strs[0].trim();
			name = strs[1].trim();
			if (nameId.length() > 0 && name.length() > 0) {
				valid = true;
			}
		}
	}

	public void parse(Text line) {
		parse(line.toString());

	}

	public String getNameId() {
		return nameId;
	}

	public void setNameId(String nameId) {
		this.nameId = nameId;
	}

	public String getName() {
		return name;
	}

	public void setName(String name) {
		this.name = name;
	}

	public boolean isValid() {
		return valid;
	}

	public void setValid(boolean valid) {
		this.valid = valid;
	}

}

MapReduce处理

package com.jf.mapreduce;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class NameReference extends Configured implements Tool {

	public int run(String[] args) throws Exception {
		Configuration conf = getConf();
		String input = conf.get("input");
		String output = conf.get("output");

		// 构建做业配置
		Job job = Job.getInstance(conf, "NameReference");
		// 设置做业所要执行的类
		job.setJarByClass(this.getClass());
		// 设置自定义的mapper类,以及tapper类的输出key和value类型。
		job.setMapperClass(NameMapper.class);
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(IntWritable.class);
		// 设置自定义的Reducer类以及输出时的类型
		job.setReducerClass(NameReducer.class);
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(IntWritable.class);
		// 设置读取最原始数据的格式信息以及
		// 数据输出到HDFS集群中的格式信息
		job.setInputFormatClass(TextInputFormat.class);
		job.setOutputFormatClass(TextOutputFormat.class);
		TextInputFormat.addInputPath(job, new Path(input));
		TextOutputFormat.setOutputPath(job, new Path(output));

		return job.waitForCompletion(true) ? 0 : 1;
	}

	private static class NameMapper extends Mapper<LongWritable, Text, Text, IntWritable> {

		private NameRecordParser parser = null;

		@Override
		protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, IntWritable>.Context context)
				throws IOException, InterruptedException {
			this.parser = new NameRecordParser();
			parser.parse(value);
			System.out.println(value);
			if (parser.isValid()) {
				Text resultKey = new Text(parser.getName());
				IntWritable resultValue = new IntWritable(1);
				System.out.println("map:resultKey=" + resultKey.toString() + ",value=" + resultValue.get());
				context.write(resultKey, resultValue);
			}
		}

	}

	private static class NameReducer extends Reducer<Text, IntWritable, Text, IntWritable> {

		@Override
		protected void reduce(Text key, Iterable<IntWritable> values,
				Reducer<Text, IntWritable, Text, IntWritable>.Context context)
				throws IOException, InterruptedException {
			int count = 0;
			for (IntWritable intWritable : values) {
				count += intWritable.get();
			}
			System.out.println("reduce:key=" + key + ",value=" + count);
			context.write(key, new IntWritable(count));
		}
	}

	public static void main(String[] args) {
		try {
			System.exit(ToolRunner.run(new NameReference(), args));
		} catch (Exception e) {
			e.printStackTrace();
		}
	}
}

2)执行测试结果

新建测试数据文件

提交文件到hadoop文件系统中

确认文件提交成功

执行MapReduce分析数据

能够经过web查看执行进度

http://192.168.1.113:8088/cluster/apps

查看执行结果

也能够经过日志查看执行过程

4.举例:统计每一年最大气温

1)统计下面数据中每一年最高气温

014399999999999/1992-01-31/10
014399999999999/1992-02-28/11
014399999999999/1992-03-31/14
014399999999999/1992-04-30/16
014399999999999/1992-05-51/30
014399999999999/1992-06-30/33
014399999999999/1992-07-31/35
014399999999999/1993-01-31/10
014399999999999/1993-02-28/14
014399999999999/1993-03-31/13
014399999999999/1993-04-30/25
014399999999999/1993-05-31/30
014399999999999/1993-06-30/36
014399999999999/1993-07-31/38
014399999999999/1994-01-31/10
014399999999999/1994-02-28/14
014399999999999/1994-03-31/13
014399999999999/1994-04-30/25
014399999999999/1994-05-31/30
014399999999999/1994-06-30/36
014399999999999/1994-07-31/35

提交数据文件到文件系统中

代码解析数据文件

package com.jf.mapreduce;

import org.apache.hadoop.io.Text;

public class WeatherRecordParser {

	private String stationId;
	private String year;
	private int temperature = -999;
	private boolean valid;

	/**
	 * 解析数据
	 * 
	 * @param line
	 *            气象站/年月日/温度 
	 *  014399999999999/1992-01-31/10
	 */
	public void parse(String line) {
		String[] strs = line.split("/");
		if (strs.length == 3) {
			if (strs[0] != null && strs[0].length() > 0) {
				stationId = strs[0];
			}
			if (strs[1] != null && strs[1].length() > 0) {
				year = strs[1].substring(0, 4);
			}
			if (strs[2] != null && strs[2].length() > 0) {
				temperature = Integer.parseInt(strs[2]);
			}
			if (stationId != null && year != null & temperature > -999) {
				valid = true;
			}
		}
	}

	public void parse(Text value) {
		parse(value.toString());
	}

	public String getStationId() {
		return stationId;
	}

	public void setStationId(String stationId) {
		this.stationId = stationId;
	}

	public String getYear() {
		return year;
	}

	public void setYear(String year) {
		this.year = year;
	}

	public int getTemperature() {
		return temperature;
	}

	public void setTemperature(int temperature) {
		this.temperature = temperature;
	}

	public boolean isValid() {
		return valid;
	}

	public void setValid(boolean valid) {
		this.valid = valid;
	}
}

统计每一年最大气温

package com.jf.mapreduce;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

/**
 * 获取每一年最高温度
 * 
 * @author Administrator
 *
 */
public class MaxTemperatureByYear extends Configured implements Tool {

	private static class MaxTempMapper extends Mapper<LongWritable, Text, Text, IntWritable> {

		private WeatherRecordParser parser = null;

		@Override
		protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, IntWritable>.Context context)
				throws IOException, InterruptedException {
			parser = new WeatherRecordParser();
			parser.parse(value);
			if (parser.isValid()) {
				Text resultKey = new Text(parser.getYear());
				IntWritable resultValue = new IntWritable(parser.getTemperature());
				System.out.println("map:resultKey=" + resultKey + ",resultValue=" + resultValue);
				context.write(resultKey, resultValue);
			}
		}
	}

	private static class MaxTempReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
		@Override
		protected void reduce(Text key, Iterable<IntWritable> values,
				Reducer<Text, IntWritable, Text, IntWritable>.Context context)
				throws IOException, InterruptedException {
			// 获取相同key中最大的值
			int max = Integer.MIN_VALUE;
			for (IntWritable intWritable : values) {
				if (intWritable.get() > max) {
					max = intWritable.get();
				}
			}
			System.out.println("reducer:key=" + key + ",value=" + max);
			context.write(key, new IntWritable(max));
		}
	}

	public int run(String[] args) throws Exception {

		Configuration conf = getConf();
		// 构建做业所处理数据的输入输出路径
		Path input = new Path(conf.get("input"));
		Path output = new Path(conf.get("output"));
		// 构建做业配置
		Job job = Job.getInstance(conf, MaxTemperatureByYear.class.getName());
		// 设置做业所要执行的类
		job.setJarByClass(MaxTemperatureByYear.class);
		// 设置自定义的mapper类,以及tapper类的输出key和value类型。
		job.setMapperClass(MaxTempMapper.class);
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(IntWritable.class);
		// 设置自定义的Reducer类以及输出时的类型
		job.setReducerClass(MaxTempReducer.class);
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(IntWritable.class);
		// 设置读取最原始数据的格式信息,以及数据输出到HDFS集群中的格式信息
		job.setInputFormatClass(TextInputFormat.class);
		job.setOutputFormatClass(TextOutputFormat.class);
		TextInputFormat.addInputPath(job, input);
		TextOutputFormat.setOutputPath(job, output);
		return job.waitForCompletion(true) ? 0 : 1;
	}

	public static void main(String[] args) throws Exception {
		System.exit(ToolRunner.run(new MaxTemperatureByYear(), args));
	}

}

提交任务执行

2)执行结果

File System Counters
		FILE: Number of bytes read=237
		FILE: Number of bytes written=233027
		FILE: Number of read operations=0
		FILE: Number of large read operations=0
		FILE: Number of write operations=0
		HDFS: Number of bytes read=755
		HDFS: Number of bytes written=168
		HDFS: Number of read operations=6
		HDFS: Number of large read operations=0
		HDFS: Number of write operations=2
	Job Counters 
		Launched map tasks=1
		Launched reduce tasks=1
		Data-local map tasks=1
		Total time spent by all maps in occupied slots (ms)=7929
		Total time spent by all reduces in occupied slots (ms)=8445
		Total time spent by all map tasks (ms)=7929
		Total time spent by all reduce tasks (ms)=8445
		Total vcore-seconds taken by all map tasks=7929
		Total vcore-seconds taken by all reduce tasks=8445
		Total megabyte-seconds taken by all map tasks=8119296
		Total megabyte-seconds taken by all reduce tasks=8647680
	Map-Reduce Framework
		Map input records=21
		Map output records=21
		Map output bytes=189
		Map output materialized bytes=237
		Input split bytes=106
		Combine input records=0
		Combine output records=0
		Reduce input groups=3
		Reduce shuffle bytes=237
		Reduce input records=21
		Reduce output records=21
		Spilled Records=42
		Shuffled Maps =1
		Failed Shuffles=0
		Merged Map outputs=1
		GC time elapsed (ms)=221
		CPU time spent (ms)=2330
		Physical memory (bytes) snapshot=310419456
		Virtual memory (bytes) snapshot=1687691264
		Total committed heap usage (bytes)=164040704
	Shuffle Errors
		BAD_ID=0
		CONNECTION=0
		IO_ERROR=0
		WRONG_LENGTH=0
		WRONG_MAP=0
		WRONG_REDUCE=0
	File Input Format Counters 
		Bytes Read=649
	File Output Format Counters 
		Bytes Written=168

能够经过web页面查看执行状态

查看执行日志

map计算日志输出

reduce计算统计日志

在文件系统中查看执行结果

5.配置执行reduce个数(map个数跟存储文件的block个数有关)

1)经过配置文件设置

修改配置文件:yarn-site.xml

从新执行时能够看到reduce个数为2

2)经过程序进行修改

测试执行

3)经过执行命令设置

6.程序在执行时如何选择reduce

任务分配:

①假设咱们有一个HDFS集群有4个节点分别是us1,us2,us3,us4。Yarn集群的主节点在分配资源的时候,当你客户端将做业提交的时候,resourcemanager在分配资源(或者说分配做业)的时候,尽可能将应用程序分发到有数据的节点上。这样就避免了数据在节点与节点之间传输。

②那么在us1,us2,us3中都至少有一个map任务,当map输出后通过洗牌,会根据key值的不一样生成不少组以key不一样的数据,好比咱们输出了(k21 : [v21]),(k22 : [v22])。咱们知道前面的map是并行执行的(多个map同时运行,由于处理的数据在不一样的数据块),当咱们的reduce为默认的时候是有1个,是有一个reduce因此不多是并行。咱们的reduce只有一个,而又两组数据那么哪一个先执行?Hadoop是这样规定的,咱们对数据进行分组是根据key值来分组的。那么Hadoop会让这一系列的key去比较大小,最小的先进入执行,执行完成后,按照从小到大去执行

③当reduce任务执行完成以后会生成一个文件:part-r-00000

若是咱们有2个reduce,也有2组数据,那么这个并行计算如何进行。

 Hadoop会让每一组数据的key值得hash值去和reduce的个数取余,余数是几那么就进入哪一个reduce。固然前提是给reduce编号(编号是Hadoop内部本身会去编)。

第一个reduce生成的是part-r-00000,第二个则是part-r-00001(后面的00000和00001就是reduce的编号)。例如:当第一组数据key的hash值与reduce个数取余为0则会让第一个reduce执行,当第二组数据key的hash值与reduce个数取余也为0,一样会让第一个reduce执行。这样第二个reduce一样会生成一个结果文件,第一个文件里面存放的是第一组和第二组数据结果,第二个文件为空。

 

数据分组和数据分片

①数据分片:

咱们把进入map端的数据叫作数据分片。每个数据块进入MapReudce中的map程序的时候,咱们把它叫作数据分片。

那什么样的数据是一个数据分片?HDFS集群上的一个数据块的数据对应咱们所说的数据分片。 

也就是每个数据分片由每个map任务去处理。

②数据分组:

数据通过map处理以后分红不一样的组造成数据的过程叫作数据分组。

相关文章
相关标签/搜索