本文版权归做者和博客园共有,欢迎转载,但未经做者赞成必须保留此段声明,且在文章页面明显位置给出原文链接,博主为石山园,博客地址为 http://www.cnblogs.com/shishanyuan 。该系列课程是应邀实验楼整理编写的,这里须要赞一下实验楼提供了学习的新方式,能够边看博客边上机实验,课程地址为 https://www.shiyanlou.com/courses/237java
【注】该系列所使用到安装包、测试数据和代码都可在百度网盘下载,具体地址为 http://pan.baidu.com/s/10PnDs,下载该PDF文件linux
部署节点操做系统为CentOS,防火墙和SElinux禁用,建立了一个shiyanlou用户并在系统根目录下建立/app目录,用于存放Hadoop等组件运行包。由于该目录用于安装hadoop等组件程序,用户对shiyanlou必须赋予rwx权限(通常作法是root用户在根目录下建立/app目录,并修改该目录拥有者为shiyanlou(chown –R shiyanlou:shiyanlou /app)。算法
Hadoop搭建环境:apache
l 虚拟机操做系统: CentOS6.6 64位,单核,1G内存编程
l JDK:1.7.0_55 64位网络
l Hadoop:1.1.2app
MapReduce 是现今一个很是流行的分布式计算框架,它被设计用于并行计算海量数据。第一个提出该技术框架的是Google 公司,而Google 的灵感则来自于函数式编程语言,如LISP,Scheme,ML 等。MapReduce 框架的核心步骤主要分两部分:Map 和Reduce。当你向MapReduce 框架提交一个计算做业时,它会首先把计算做业拆分红若干个Map 任务,而后分配到不一样的节点上去执行,每个Map 任务处理输入数据中的一部分,当Map 任务完成后,它会生成一些中间文件,这些中间文件将会做为Reduce 任务的输入数据。Reduce 任务的主要目标就是把前面若干个Map 的输出汇总到一块儿并输出。从高层抽象来看,MapReduce的数据流图以下图所示:框架
1. 每一个输入分片会让一个map任务来处理,默认状况下,以HDFS的一个块的大小(默认为64M)为一个分片,固然咱们也能够设置块的大小。map输出的结果会暂且放在一个环形内存缓冲区中(该缓冲区的大小默认为100M,由io.sort.mb属性控制),当该缓冲区快要溢出时(默认为缓冲区大小的80%,由io.sort.spill.percent属性控制),会在本地文件系统中建立一个溢出文件,将该缓冲区中的数据写入这个文件;jsp
2. 在写入磁盘以前,线程首先根据reduce任务的数目将数据划分为相同数目的分区,也就是一个reduce任务对应一个分区的数据。这样作是为了不有些reduce任务分配到大量数据,而有些reduce任务却分到不多数据,甚至没有分到数据的尴尬局面。其实分区就是对数据进行hash的过程。而后对每一个分区中的数据进行排序,若是此时设置了Combiner,将排序后的结果进行Combia操做,这样作的目的是让尽量少的数据写入到磁盘;编程语言
3. 当map任务输出最后一个记录时,可能会有不少的溢出文件,这时须要将这些文件合并。合并的过程当中会不断地进行排序和combia操做,目的有两个:
l尽可能减小每次写入磁盘的数据量
l尽可能减小下一复制阶段网络传输的数据量。最后合并成了一个已分区且已排序的文件。为了减小网络传输的数据量,这里能够将数据压缩,只要将mapred.compress.map.out设置为true就能够了
4. 将分区中的数据拷贝给相对应的reduce任务。有人可能会问:分区中的数据怎么知道它对应的reduce是哪一个呢?其实map任务一直和其父TaskTracker保持联系,而TaskTracker又一直和JobTracker保持心跳。因此JobTracker中保存了整个集群中的宏观信息。只要reduce任务向JobTracker获取对应的map输出位置就能够了。
1. Reduce会接收到不一样map任务传来的数据,而且每一个map传来的数据都是有序的。若是reduce端接受的数据量至关小,则直接存储在内存中(缓冲区大小由mapred.job.shuffle.input.buffer.percent属性控制,表示用做此用途的堆空间的百分比),若是数据量超过了该缓冲区大小的必定比例(由mapred.job.shuffle.merge.percent决定),则对数据合并后溢写到磁盘中;
2. 随着溢写文件的增多,后台线程会将它们合并成一个更大的有序的文件,这样作是为了给后面的合并节省时间。其实无论在map端仍是reduce端,MapReduce都是反复地执行排序,合并操做;
3. 合并的过程当中会产生许多的中间文件(写入磁盘了),但MapReduce会让写入磁盘的数据尽量地少,而且最后一次合并的结果并无写入磁盘,而是直接输入到reduce函数。
1.在集群中的任意一个节点提交MapReduce程序;
2.JobClient收到做业后,JobClient向JobTracker请求获取一个Job ID;
3.将运行做业所须要的资源文件复制到HDFS上(包括MapReduce程序打包的JAR文件、配置文件和客户端计算所得的输入划分信息),这些文件都存放在JobTracker专门为该做业建立的文件夹中,文件夹名为该做业的Job ID;
4.得到做业ID后,提交做业;
5.JobTracker接收到做业后,将其放在一个做业队列里,等待做业调度器对其进行调度,看成业调度器根据本身的调度算法调度到该做业时,会根据输入划分信息为每一个划分建立一个map任务,并将map任务分配给TaskTracker执行;
6.对于map和reduce任务,TaskTracker根据主机核的数量和内存的大小有固定数量的map槽和reduce槽。这里须要强调的是:map任务不是随随便便地分配给某个TaskTracker的,这里有个概念叫:数据本地化(Data-Local)。意思是:将map任务分配给含有该map处理的数据块的TaskTracker上,同时将程序JAR包复制到该TaskTracker上来运行,这叫“运算移动,数据不移动”;
7.TaskTracker每隔一段时间会给JobTracker发送一个心跳,告诉JobTracker它依然在运行,同时心跳中还携带着不少的信息,好比当前map任务完成的进度等信息。当JobTracker收到做业的最后一个任务完成信息时,便把该做业设置成“成功”。当JobClient查询状态时,它将得知任务已完成,便显示一条消息给用户;
8.运行的TaskTracker从HDFS中获取运行所须要的资源,这些资源包括MapReduce程序打包的JAR文件、配置文件和客户端计算所得的输入划分等信息;
9.TaskTracker获取资源后启动新的JVM虚拟机;
10. 运行每个任务;
下载气象数据集部分数据,写一个Map-Reduce做业,求每一年的最低温度
1 import org.apache.hadoop.fs.Path; 2 import org.apache.hadoop.io.IntWritable; 3 import org.apache.hadoop.io.Text; 4 import org.apache.hadoop.mapreduce.Job; 5 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 6 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 7 8 public class MinTemperature { 9 10 public static void main(String[] args) throws Exception { 11 if(args.length != 2) { 12 System.err.println("Usage: MinTemperature<input path> <output path>"); 13 System.exit(-1); 14 } 15 16 Job job = new Job(); 17 job.setJarByClass(MinTemperature.class); 18 job.setJobName("Min temperature"); 19 FileInputFormat.addInputPath(job, new Path(args[0])); 20 FileOutputFormat.setOutputPath(job, new Path(args[1])); 21 job.setMapperClass(MinTemperatureMapper.class); 22 job.setReducerClass(MinTemperatureReducer.class); 23 job.setOutputKeyClass(Text.class); 24 job.setOutputValueClass(IntWritable.class); 25 System.exit(job.waitForCompletion(true) ? 0 : 1); 26 } 27 }
1 import java.io.IOException; 2 import org.apache.hadoop.io.IntWritable; 3 import org.apache.hadoop.io.LongWritable; 4 import org.apache.hadoop.io.Text; 5 import org.apache.hadoop.mapreduce.Mapper; 6 7 public class MinTemperatureMapper extends Mapper<LongWritable, Text, Text, IntWritable>{ 8 9 private static final int MISSING = 9999; 10 11 @Override 12 public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { 13 14 String line = value.toString(); 15 String year = line.substring(15, 19); 16 17 int airTemperature; 18 if(line.charAt(87) == '+') { 19 airTemperature = Integer.parseInt(line.substring(88, 92)); 20 } else { 21 airTemperature = Integer.parseInt(line.substring(87, 92)); 22 } 23 24 String quality = line.substring(92, 93); 25 if(airTemperature != MISSING && quality.matches("[01459]")) { 26 context.write(new Text(year), new IntWritable(airTemperature)); 27 } 28 } 29 }
1 import java.io.IOException; 2 import org.apache.hadoop.io.IntWritable; 3 import org.apache.hadoop.io.Text; 4 import org.apache.hadoop.mapreduce.Reducer; 5 6 public class MinTemperatureReducer extends Reducer<Text, IntWritable, Text, IntWritable> { 7 8 @Override 9 public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { 10 11 int minValue = Integer.MAX_VALUE; 12 for(IntWritable value : values) { 13 minValue = Math.min(minValue, value.get()); 14 } 15 context.write(key, new IntWritable(minValue)); 16 } 17 }
进入/app/hadoop-1.1.2/myclass目录,在该目录中创建MinTemperature.java、MinTemperatureMapper.java和MinTemperatureReducer.java代码文件,执行命令以下:
cd /app/hadoop-1.1.2/myclass/
vi MinTemperature.java
vi MinTemperatureMapper.java
vi MinTemperatureReducer.java
MinTemperature.java:
MinTemperatureMapper.java:
MinTemperatureReducer.java:
在/app/hadoop-1.1.2/myclass目录中,使用以下命令对java代码进行编译,为保证编译成功,加入classpath变量,引入hadoop-core-1.1.2.jar包:
javac -classpath ../hadoop-core-1.1.2.jar *.java
把编译好class文件打包,不然在执行过程会发生错误。把打好的包移动到上级目录并删除编译好的class文件:
jar cvf ./MinTemperature.jar ./Min*.class
mv *.jar ..
rm Min*.class
把NCDC气象数据解压,并使用zcat命令把这些数据文件解压并合并到一个temperature.txt文件中
cd /home/shiyanlou
unzip temperature
cd temperature
zcat *.gz > temperature.txt
气象数据具体的下载地址为 ftp://ftp3.ncdc.noaa.gov/pub/data/noaa/ ,该数据包括1900年到如今全部年份的气象数据,大小大概有70多个G,为了测试简单,咱们这里选取一部分的数据进行测试。合并后把这个文件上传到HDFS文件系统的/class5/in目录中:
hadoop fs -mkdir -p /class5/in
hadoop fs -copyFromLocal temperature.txt /class5/in
hadoop fs -ls /class5/in
以jar的方式启动MapReduce任务,执行输出目录为/class5/out:
cd /app/hadoop-1.1.2
hadoop jar MinTemperature.jar MinTemperature /class5/in/temperature.txt /class5/out
执行成功后,查看/class5/out目录中是否存在运行结果,使用cat查看结果(温度须要除以10):
hadoop fs -ls /class5/out
hadoop fs -cat /class5/out/part-r-00000
1.查看jobtracker.jsp
http://XX. XXX.XX.XXX:50030/jobtracker.jsp
查看已经完成的做业任务:
任务的详细信息:
2. 查看dfshealth.jsp
http://XX. XXX.XX.XXX:50070/dfshealth.jsp
分别查看HDFS文件系统和日志
若是求温度的平均值,能使用combiner吗?有没有变通的方法?
不能直接使用,由于求平均值和前面求最值存在差别,各局部最值的最值仍是等于总体的最值的,可是对于平均值而言,各局部平均值的平均值将再也不是总体的平均值了,因此不能直接用combiner。能够经过变通的办法使用combiner来计算平均值,即在combiner的键值对中不直接存储最后的平均值,而是存储全部值的和个数,最后在reducer输出时再用和除以个数获得平均值。
1 import org.apache.hadoop.fs.Path; 2 import org.apache.hadoop.io.IntWritable; 3 import org.apache.hadoop.io.Text; 4 import org.apache.hadoop.mapreduce.Job; 5 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 6 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 7 8 public class AvgTemperature { 9 10 public static void main(String[] args) throws Exception { 11 12 if(args.length != 2) { 13 System.out.println("Usage: AvgTemperatrue <input path><output path>"); 14 System.exit(-1); 15 } 16 17 Job job = new Job(); 18 job.setJarByClass(AvgTemperature.class); 19 job.setJobName("Avg Temperature"); 20 FileInputFormat.addInputPath(job, new Path(args[0])); 21 FileOutputFormat.setOutputPath(job, new Path(args[1])); 22 23 job.setMapperClass(AvgTemperatureMapper.class); 24 job.setCombinerClass(AvgTemperatureCombiner.class); 25 job.setReducerClass(AvgTemperatureReducer.class); 26 27 job.setMapOutputKeyClass(Text.class); 28 job.setMapOutputValueClass(Text.class); 29 30 job.setOutputKeyClass(Text.class); 31 job.setOutputValueClass(IntWritable.class); 32 33 System.exit(job.waitForCompletion(true) ? 0 : 1); 34 } 35 }
4.3.2 AvgTemperatureMapper.java
1 import java.io.IOException; 2 import org.apache.hadoop.io.IntWritable; 3 import org.apache.hadoop.io.LongWritable; 4 import org.apache.hadoop.io.Text; 5 import org.apache.hadoop.mapreduce.Mapper; 6 7 public class AvgTemperatureMapper extends Mapper<LongWritable, Text, Text, Text> { 8 9 private static final int MISSING = 9999; 10 11 @Override 12 public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException{ 13 14 String line = value.toString(); 15 String year = line.substring(15, 19); 16 17 int airTemperature; 18 if(line.charAt(87) == '+') { 19 airTemperature = Integer.parseInt(line.substring(88, 92)); 20 } else { 21 airTemperature = Integer.parseInt(line.substring(87, 92)); 22 } 23 24 String quality = line.substring(92, 93); 25 if(airTemperature != MISSING && !quality.matches("[01459]")) { 26 context.write(new Text(year), new Text(String.valueOf(airTemperature))); 27 } 28 } 29 }
4.3.3 AvgTemperatureCombiner.java
1 import java.io.IOException; 2 import org.apache.hadoop.io.Text; 3 import org.apache.hadoop.mapreduce.Reducer; 4 5 public class AvgTemperatureCombiner extends Reducer<Text, Text, Text, Text>{ 6 7 @Override 8 public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { 9 10 double sumValue = 0; 11 long numValue = 0; 12 13 for(Text value : values) { 14 sumValue += Double.parseDouble(value.toString()); 15 numValue ++; 16 } 17 18 context.write(key, new Text(String.valueOf(sumValue) + ',' + String.valueOf(numValue))); 19 } 20 }
4.3.4 AvgTemperatureReducer.java
1 import java.io.IOException; 2 import org.apache.hadoop.io.IntWritable; 3 import org.apache.hadoop.io.Text; 4 import org.apache.hadoop.mapreduce.Reducer; 5 6 public class AvgTemperatureReducer extends Reducer<Text, Text, Text, IntWritable>{ 7 8 @Override 9 public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { 10 11 double sumValue = 0; 12 long numValue = 0; 13 int avgValue = 0; 14 15 for(Text value : values) { 16 String[] valueAll = value.toString().split(","); 17 sumValue += Double.parseDouble(valueAll[0]); 18 numValue += Integer.parseInt(valueAll[1]); 19 } 20 21 avgValue = (int)(sumValue/numValue); 22 context.write(key, new IntWritable(avgValue)); 23 } 24 }
进入/app/hadoop-1.1.2/myclass目录,在该目录中创建AvgTemperature.java、AvgTemperatureMapper.java、AvgTemperatureCombiner.java和AvgTemperatureReducer.java代码文件,代码内容为4.3所示,执行命令以下:
cd /app/hadoop-1.1.2/myclass/
vi AvgTemperature.java
vi AvgTemperatureMapper.java
vi AvgTemperatureCombiner.java
vi AvgTemperatureReducer.java
AvgTemperature.java:
AvgTemperatureMapper.java:
AvgTemperatureCombiner.java:
AvgTemperatureReducer.java:
在/app/hadoop-1.1.2/myclass目录中,使用以下命令对java代码进行编译,为保证编译成功,加入classpath变量,引入hadoop-core-1.1.2.jar包:
javac -classpath ../hadoop-core-1.1.2.jar Avg*.java
把编译好class文件打包,不然在执行过程会发生错误。把打好的包移动到上级目录并删除编译好的class文件:
jar cvf ./AvgTemperature.jar ./Avg*.class
ls
mv *.jar ..
rm Avg*.class
数据使用做业2求每一年最低温度的气象数据,数据在HDFS位置为/class5/in/temperature.txt,以jar的方式启动MapReduce任务,执行输出目录为/class5/out2:
cd /app/hadoop-1.1.2
hadoop jar AvgTemperature.jar AvgTemperature /class5/in/temperature.txt /class5/out2
执行成功后,查看/class5/out2目录中是否存在运行结果,使用cat查看结果(温度须要除以10):
hadoop fs -ls /class5/out2
hadoop fs -cat /class5/out2/part-r-00000