MapReduce是一种编程模型,用于大规模数据集(大于1TB)的并行运算,用于解决海量数据的计算问题。java
MapReduce分两部分组成web
①映射(Mapping):对集合里面的每个目标进行相同的操做,好比你要将一个表单里面的每一个单元格作乘以2的操做,那么你就能够将乘以2这个方法应用到表单里面的每一个单元格上面。apache
②化简(Reducing):遍历集合中的元素来返回一个综合的结果。即,输出表单里一列数字的和这个任务属于reducing。编程
执行过程:你向MapReduce框架提交一个计算做业时,它会首先把计算做业拆分红若干个Map任务,而后分配到不一样的节点上去执行,每个Map任务处理输入数据中的一部分,当Map任务完成后,它会生成一些中间文件,这些中间文件将会做为Reduce任务的输入数据。Reduce任务的主要目标就是把前面若干个Map的输出汇总到一块儿并输出。app
①Map函数:(k1 : v1) -->[(k2 : v2)]框架
→输入:键值对(k1 : v1)表示的数据。ide
→处理:文档数据记录(如文本文件中的一行)以键值对的形式传入map函数,处理完成以后以另外一种键值对的形式输出处理结果[(k2 : v2)]。函数
→输出:键值对[(k2 : v2)]表示的一组中间数据。oop
②Reduce函数:(k2 : [v2]) --> [(k3 : k4)]测试
→输入:map输出的一组键值对[(k2 : v2)]将被进行合并处理将一样主键下的不一样值合并到一个列表[v2]中,故reduce的输入为(k2 : [v2])。
→处理:对传入的中间结果列表数据进行某种整理或进一步处理,并输出最终的某种的键值对形式的输出结果[(k3 : k4)]。
→输出:键值对[(k3 : k4)]表示最终数据。
注意:各个map函数对所划分的数据进行并行处理,从不一样的输入数据产生不一样的输出数据。进行reduce处理以前必须等到全部的map函数作完。最终汇总全部的reduce的输出结果便可得到最终结果。
1,sean
2,bob
3,sean
4,bob
5,jf
从上面的数据分析出,咱们须要的是一行数据中的后一个数据,在map函数中,输入端v1表明的是一行数据,输出端的k2能够表明是被统计的姓名。在reduce函数中,k2仍是被统计的姓名,而[v2]是一个数据集,这里是将k2相同的键的v2数据合并起来。输出的是本身须要的数据k3表明的是统计的姓名,v3是姓名出现的次数。
代码实现:
解析文件数据
package com.jf.mapreduce; import org.apache.hadoop.io.Text; public class NameRecordParser { private String nameId; private String name; private boolean valid; // 解析每行数据 public void parse(String line) { String[] strs = line.split(","); if (strs.length == 2) { nameId = strs[0].trim(); name = strs[1].trim(); if (nameId.length() > 0 && name.length() > 0) { valid = true; } } } public void parse(Text line) { parse(line.toString()); } public String getNameId() { return nameId; } public void setNameId(String nameId) { this.nameId = nameId; } public String getName() { return name; } public void setName(String name) { this.name = name; } public boolean isValid() { return valid; } public void setValid(boolean valid) { this.valid = valid; } }
MapReduce处理
package com.jf.mapreduce; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; public class NameReference extends Configured implements Tool { public int run(String[] args) throws Exception { Configuration conf = getConf(); String input = conf.get("input"); String output = conf.get("output"); // 构建做业配置 Job job = Job.getInstance(conf, "NameReference"); // 设置做业所要执行的类 job.setJarByClass(this.getClass()); // 设置自定义的mapper类,以及tapper类的输出key和value类型。 job.setMapperClass(NameMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); // 设置自定义的Reducer类以及输出时的类型 job.setReducerClass(NameReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); // 设置读取最原始数据的格式信息以及 // 数据输出到HDFS集群中的格式信息 job.setInputFormatClass(TextInputFormat.class); job.setOutputFormatClass(TextOutputFormat.class); TextInputFormat.addInputPath(job, new Path(input)); TextOutputFormat.setOutputPath(job, new Path(output)); return job.waitForCompletion(true) ? 0 : 1; } private static class NameMapper extends Mapper<LongWritable, Text, Text, IntWritable> { private NameRecordParser parser = null; @Override protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, IntWritable>.Context context) throws IOException, InterruptedException { this.parser = new NameRecordParser(); parser.parse(value); System.out.println(value); if (parser.isValid()) { Text resultKey = new Text(parser.getName()); IntWritable resultValue = new IntWritable(1); System.out.println("map:resultKey=" + resultKey.toString() + ",value=" + resultValue.get()); context.write(resultKey, resultValue); } } } private static class NameReducer extends Reducer<Text, IntWritable, Text, IntWritable> { @Override protected void reduce(Text key, Iterable<IntWritable> values, Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException { int count = 0; for (IntWritable intWritable : values) { count += intWritable.get(); } System.out.println("reduce:key=" + key + ",value=" + count); context.write(key, new IntWritable(count)); } } public static void main(String[] args) { try { System.exit(ToolRunner.run(new NameReference(), args)); } catch (Exception e) { e.printStackTrace(); } } }
新建测试数据文件
提交文件到hadoop文件系统中
确认文件提交成功
执行MapReduce分析数据
能够经过web查看执行进度
http://192.168.1.113:8088/cluster/apps
查看执行结果
也能够经过日志查看执行过程
014399999999999/1992-01-31/10
014399999999999/1992-02-28/11
014399999999999/1992-03-31/14
014399999999999/1992-04-30/16
014399999999999/1992-05-51/30
014399999999999/1992-06-30/33
014399999999999/1992-07-31/35
014399999999999/1993-01-31/10
014399999999999/1993-02-28/14
014399999999999/1993-03-31/13
014399999999999/1993-04-30/25
014399999999999/1993-05-31/30
014399999999999/1993-06-30/36
014399999999999/1993-07-31/38
014399999999999/1994-01-31/10
014399999999999/1994-02-28/14
014399999999999/1994-03-31/13
014399999999999/1994-04-30/25
014399999999999/1994-05-31/30
014399999999999/1994-06-30/36
014399999999999/1994-07-31/35
提交数据文件到文件系统中
代码解析数据文件
package com.jf.mapreduce; import org.apache.hadoop.io.Text; public class WeatherRecordParser { private String stationId; private String year; private int temperature = -999; private boolean valid; /** * 解析数据 * * @param line * 气象站/年月日/温度 * 014399999999999/1992-01-31/10 */ public void parse(String line) { String[] strs = line.split("/"); if (strs.length == 3) { if (strs[0] != null && strs[0].length() > 0) { stationId = strs[0]; } if (strs[1] != null && strs[1].length() > 0) { year = strs[1].substring(0, 4); } if (strs[2] != null && strs[2].length() > 0) { temperature = Integer.parseInt(strs[2]); } if (stationId != null && year != null & temperature > -999) { valid = true; } } } public void parse(Text value) { parse(value.toString()); } public String getStationId() { return stationId; } public void setStationId(String stationId) { this.stationId = stationId; } public String getYear() { return year; } public void setYear(String year) { this.year = year; } public int getTemperature() { return temperature; } public void setTemperature(int temperature) { this.temperature = temperature; } public boolean isValid() { return valid; } public void setValid(boolean valid) { this.valid = valid; } }
统计每一年最大气温
package com.jf.mapreduce; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; /** * 获取每一年最高温度 * * @author Administrator * */ public class MaxTemperatureByYear extends Configured implements Tool { private static class MaxTempMapper extends Mapper<LongWritable, Text, Text, IntWritable> { private WeatherRecordParser parser = null; @Override protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, IntWritable>.Context context) throws IOException, InterruptedException { parser = new WeatherRecordParser(); parser.parse(value); if (parser.isValid()) { Text resultKey = new Text(parser.getYear()); IntWritable resultValue = new IntWritable(parser.getTemperature()); System.out.println("map:resultKey=" + resultKey + ",resultValue=" + resultValue); context.write(resultKey, resultValue); } } } private static class MaxTempReducer extends Reducer<Text, IntWritable, Text, IntWritable> { @Override protected void reduce(Text key, Iterable<IntWritable> values, Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException { // 获取相同key中最大的值 int max = Integer.MIN_VALUE; for (IntWritable intWritable : values) { if (intWritable.get() > max) { max = intWritable.get(); } } System.out.println("reducer:key=" + key + ",value=" + max); context.write(key, new IntWritable(max)); } } public int run(String[] args) throws Exception { Configuration conf = getConf(); // 构建做业所处理数据的输入输出路径 Path input = new Path(conf.get("input")); Path output = new Path(conf.get("output")); // 构建做业配置 Job job = Job.getInstance(conf, MaxTemperatureByYear.class.getName()); // 设置做业所要执行的类 job.setJarByClass(MaxTemperatureByYear.class); // 设置自定义的mapper类,以及tapper类的输出key和value类型。 job.setMapperClass(MaxTempMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); // 设置自定义的Reducer类以及输出时的类型 job.setReducerClass(MaxTempReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); // 设置读取最原始数据的格式信息,以及数据输出到HDFS集群中的格式信息 job.setInputFormatClass(TextInputFormat.class); job.setOutputFormatClass(TextOutputFormat.class); TextInputFormat.addInputPath(job, input); TextOutputFormat.setOutputPath(job, output); return job.waitForCompletion(true) ? 0 : 1; } public static void main(String[] args) throws Exception { System.exit(ToolRunner.run(new MaxTemperatureByYear(), args)); } }
提交任务执行
File System Counters FILE: Number of bytes read=237 FILE: Number of bytes written=233027 FILE: Number of read operations=0 FILE: Number of large read operations=0 FILE: Number of write operations=0 HDFS: Number of bytes read=755 HDFS: Number of bytes written=168 HDFS: Number of read operations=6 HDFS: Number of large read operations=0 HDFS: Number of write operations=2 Job Counters Launched map tasks=1 Launched reduce tasks=1 Data-local map tasks=1 Total time spent by all maps in occupied slots (ms)=7929 Total time spent by all reduces in occupied slots (ms)=8445 Total time spent by all map tasks (ms)=7929 Total time spent by all reduce tasks (ms)=8445 Total vcore-seconds taken by all map tasks=7929 Total vcore-seconds taken by all reduce tasks=8445 Total megabyte-seconds taken by all map tasks=8119296 Total megabyte-seconds taken by all reduce tasks=8647680 Map-Reduce Framework Map input records=21 Map output records=21 Map output bytes=189 Map output materialized bytes=237 Input split bytes=106 Combine input records=0 Combine output records=0 Reduce input groups=3 Reduce shuffle bytes=237 Reduce input records=21 Reduce output records=21 Spilled Records=42 Shuffled Maps =1 Failed Shuffles=0 Merged Map outputs=1 GC time elapsed (ms)=221 CPU time spent (ms)=2330 Physical memory (bytes) snapshot=310419456 Virtual memory (bytes) snapshot=1687691264 Total committed heap usage (bytes)=164040704 Shuffle Errors BAD_ID=0 CONNECTION=0 IO_ERROR=0 WRONG_LENGTH=0 WRONG_MAP=0 WRONG_REDUCE=0 File Input Format Counters Bytes Read=649 File Output Format Counters Bytes Written=168
能够经过web页面查看执行状态
查看执行日志
map计算日志输出
reduce计算统计日志
在文件系统中查看执行结果
修改配置文件:yarn-site.xml
从新执行时能够看到reduce个数为2
测试执行
任务分配:
①假设咱们有一个HDFS集群有4个节点分别是us1,us2,us3,us4。Yarn集群的主节点在分配资源的时候,当你客户端将做业提交的时候,resourcemanager在分配资源(或者说分配做业)的时候,尽可能将应用程序分发到有数据的节点上。这样就避免了数据在节点与节点之间传输。
②那么在us1,us2,us3中都至少有一个map任务,当map输出后通过洗牌,会根据key值的不一样生成不少组以key不一样的数据,好比咱们输出了(k21 : [v21]),(k22 : [v22])。咱们知道前面的map是并行执行的(多个map同时运行,由于处理的数据在不一样的数据块),当咱们的reduce为默认的时候是有1个,是有一个reduce因此不多是并行。咱们的reduce只有一个,而又两组数据那么哪一个先执行?Hadoop是这样规定的,咱们对数据进行分组是根据key值来分组的。那么Hadoop会让这一系列的key去比较大小,最小的先进入执行,执行完成后,按照从小到大去执行。
③当reduce任务执行完成以后会生成一个文件:part-r-00000
若是咱们有2个reduce,也有2组数据,那么这个并行计算如何进行。
Hadoop会让每一组数据的key值得hash值去和reduce的个数取余,余数是几那么就进入哪一个reduce。固然前提是给reduce编号(编号是Hadoop内部本身会去编)。
第一个reduce生成的是part-r-00000,第二个则是part-r-00001(后面的00000和00001就是reduce的编号)。例如:当第一组数据key的hash值与reduce个数取余为0则会让第一个reduce执行,当第二组数据key的hash值与reduce个数取余也为0,一样会让第一个reduce执行。这样第二个reduce一样会生成一个结果文件,第一个文件里面存放的是第一组和第二组数据结果,第二个文件为空。
数据分组和数据分片
①数据分片:
咱们把进入map端的数据叫作数据分片。每个数据块进入MapReudce中的map程序的时候,咱们把它叫作数据分片。
那什么样的数据是一个数据分片?HDFS集群上的一个数据块的数据对应咱们所说的数据分片。
也就是每个数据分片由每个map任务去处理。
②数据分组:
数据通过map处理以后分红不一样的组造成数据的过程叫作数据分组。