学写代码的时候,咱们老是先从helloworld开始写起,那么学习Hadoop,咱们也必不可少的从helloworld开始,那么WordCount做为经典的Hadoop程序,能够做为咱们庖丁解牛的材料,进而从代码的角度学习一下mapreduce的实现过程。下面咱们就开始一步步的探索。java
先从源码看起,再一步步剖析apache
package org.apache.hadoop.examples; import java.io.IOException; import java.util.StringTokenizer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.GenericOptionsParser; public class WordCount { public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable>{ private final static IntWritable one = new IntWritable(1); private Text word = new Text(); public void map(Object key, Text value, Context context ) throws IOException, InterruptedException { StringTokenizer itr = new StringTokenizer(value.toString()); while (itr.hasMoreTokens()) { word.set(itr.nextToken()); context.write(word, one); } } } public static class IntSumReducer extends Reducer<Text,IntWritable,Text,IntWritable> { private IntWritable result = new IntWritable(); public void reduce(Text key, Iterable<IntWritable> values, Context context ) throws IOException, InterruptedException { int sum = 0; for (IntWritable val : values) { sum += val.get(); } result.set(sum); context.write(key, result); } } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); conf.set("mapred.job.tracker", "172.16.10.15:9001");//本身额外加的代码 String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); if (otherArgs.length != 2) { System.err.println("Usage: wordcount <in> <out>"); System.exit(2); } Job job = new Job(conf, "word count"); job.setJarByClass(WordCount.class); job.setMapperClass(TokenizerMapper.class); job.setCombinerClass(IntSumReducer.class); job.setReducerClass(IntSumReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); FileInputFormat.addInputPath(job, new Path(otherArgs[0])); FileOutputFormat.setOutputPath(job, new Path(otherArgs[1])); System.exit(job.waitForCompletion(true) ? 0 : 1); } }
你们能够看到整个源代码分为三个部分:网络
1. Mapapp
public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable>{ private final static IntWritable one = new IntWritable(1); private Text word = new Text(); public void map(Object key, Text value, Context context) throws IOException, InterruptedException { StringTokenizer itr = new StringTokenizer(value.toString()); while (itr.hasMoreTokens()) { word.set(itr.nextToken()); context.write(word, one); } } }
a) 定义一个本身的Map过程,TokenizerMapper 这个类名本身设定,这个类须要继承org.apache.hadoop.mapreduce包中的Mapper类,四个参数分别表示输入键key的参数类型,输入值value的参数类型,输出键key的参数类型,输出值value的参数类型。 值得注意的是Hadoop自己提供了一套可优化的网络序列化传输的基本类型,而不是用java内嵌的类型。这些类型都是在org.apache.hadoop.io包中。其中LongWritable类型至关于Long类型,Text类型至关于String类型,IntWritable至关于Integer类型。
b) map方法中参数value是指文本文件中的一行,参数key是为该行首字母相对于文本文件首地址的偏移量
c) StringTokenizer类是一个用来分隔String的应用类,相似于split。框架
//它的构造函数有三种: public StringTokenizer(String str) public StringTokenizer(String str,String delim) public StringTokenizer(String str,String delim,boolean returnDelims) //其中第一个参数为要分隔的String,第二个参数为分隔字符集合,第三个参数为分隔符是否做为标记返回,若是不指定分隔符,默认是'\t\n\r\f' //它的方法主要有三种: public boolean hasMoreTokens()//返回是否还有分隔符 public String nextToken()//返回从当前位置到下一个分隔符的字符串 public int countTokens()//返回nextToken方法被调用的次数
d) 通过StringTolenizer 处理以后会获得一个个 < word,1 > 这样的键值对,放在context里,Context用于输出内容的写入,读起来有点儿绕口,本身理解一下。eclipse
2. Reduce函数
public static class IntSumReducer extends Reducer<Text,IntWritable,Text,IntWritable> { private IntWritable result = new IntWritable(); public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int sum = 0; for (IntWritable val : values) { sum += val.get(); } result.set(sum); context.write(key, result); } }
a) 同mapper 过程同样,Reduce过程须要继承org.apache.hadoop.mapreduce包中Reducer类,并重写其reduce方法。
b) reduce方法中输入参数key 指单个单词,values 指对应单词的计数值的列表
c) reduce 方法的目的就是对列表的值进行加和处理
d) 输出的是< key,value>,key 指单个单词,value 指对应单词的计数值的列表的值的总和。oop
3. Main学习
public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); conf.set("mapred.job.tracker", "172.16.10.15:9001");//本身额外加的代码 String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); if (otherArgs.length != 2) { System.err.println("Usage: wordcount <in> <out>"); System.exit(2); } Job job = new Job(conf, "word count"); job.setJarByClass(WordCount.class); job.setMapperClass(TokenizerMapper.class); job.setCombinerClass(IntSumReducer.class); job.setReducerClass(IntSumReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); FileInputFormat.addInputPath(job, new Path(otherArgs[0])); FileOutputFormat.setOutputPath(job, new Path(otherArgs[1])); System.exit(job.waitForCompletion(true) ? 0 : 1); }
a) Configuration conf = new Configuration(); 默认状况下,Configuration开始实例化的时候,会从Hadoop的配置文件里读取参数。
b) conf.set(“mapred.job.tracker”, “172.16.10.15:9001”);设置这句代码是因为咱们要把使用eclipse提交做业到Hadoop集群,因此手动添加Job运行地址。如果直接在Hadoop 集群进行运行,不用加这句代码。 并且你能够看到只要前三句使用了这个代码,因此这三句之后的代码才是全部Hadoop例子中都会包含的。
c) 接下来这一句也是读取参数,这里是从命令行参数里读取参数。
d) Job job = new Job(conf, “word count”); 在MapReduce处理过程当中,由Job对象负责管理和运行一个计算任务,而后经过Job的若干方法来对任务的参数进行设置。”word count”是Job的名字,(固然了,根据全部java语言规范规定的那样,你也能够用测试
Job job = new Job(); job.setJobName("Name");
的形式作声明)。
e) job.setJarByClass(WordCount.class);是根据WordCount类的位置设置Jar文件 。
为何要这么作?由于咱们在Hadoop集群上运行这个做业时候,要把代码打包成一个JAR文件,用以在集群上发布这个文件。Hadoop利用这个传递进去的类来查找包含它的JAR文件。
f) job.setMapperClass(TokenizerMapper.class);设置Mapper
g) job.setCombinerClass(IntSumReducer.class);设置Combiner,这里先使用Reduce类来进行Mapper 的中间结果的合并,可以减轻网络传输的压力。
h) job.setReducerClass(IntSumReducer.class);设置Reduce
i) job.setOutputKeyClass(Text.class);和 job.setOutputValueClass(IntWritable.class);分别是设置输出键的类型和设置输出值的类型
j) FileInputFormat.addInputPath(job, new Path(otherArgs[0]));设置输入文件,它是otherArgs第一个参数
k) FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));设置输出文件,将输出结果写入这个文件里,它是otherArgs第二个参数 。
注意:在运行做业前这个输出目录不该该存在,不然Hadoop会报错并拒绝运行该做业。这种预防措施的目的是防止数据丢失(若是长时间运行的数据结果被意外覆盖,确定是很是恼人的)。
l) System.exit(job.waitForCompletion(true) ? 0 : 1);job执行,等待执行结果
4. 各个包的功能
到此为止,三大部分就分析完毕,而后再来看看引入的有哪些类:
a) package org.apache.hadoop.examples;Java 提供包机制管理代码,关键词是package, 包名字能够本身定,但不能重复。一般为了包的惟一性,推荐使用公司域名的逆序做为包,因而有了上面例子中的‘org.apache.hadoop’这样的包名。
b) import java.io.IOException; 凡是以java开头的包,在JDK1.7的API里能够找到类的资料。这里是从java.io中引入IOException,是一个输入输出异常类。
c) import java.util.StringTokenizer;这是从java.util包中引入的StringTokenizer类,是一个解析文本的类。具体用法上文中已提过了。
d) import org.apache.hadoop.conf.Configuration;凡是以org.apache.hadoop开头的包,在Hadoop1.2.1 的API文档能够找到类的资料。这里是从hadoop的conf包中引入Configuration类,它是一个读写和保存配置信息的类。
e) import org.apache.hadoop.fs.Path; Path类保存文件或者目录的路径字符串
f) import org.apache.hadoop.io.IntWritable; IntWritable是一个以类表示的可序化的整数。在java中,要表示一个整数,可使用int类型,也可使用integer类型,integer封装了int类型,且integer类是可序化的。但Hadoop认为integer的可序化不合适,因而实现了IntWritable。
g) import org.apache.hadoop.io.Text; 从io包中引入Text类,是一个存储字符串的可比较可序化的类。
h) import org.apache.hadoop.mapreduce.Job; 引入Job类,Hadoop中每一个须要执行的任务是一个Job,这个Job负责参数配置、设置MapReduce细节、提交到Hadoop集群、执行控制等操做。
i) import org.apache.hadoop.mapreduce.Mapper;引入Mapper类,负责MapReduce中的Map过程。
j) import org.apache.hadoop.mapreduce.Reducer;引入Reduce类,负责MapReduce中的Reduce过程。
k) import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;引入FileInputFormat类,主要功能是将文件进行切片。
l) import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;FileOutputFormat类是将输出结果写入文件。
m) import org.apache.hadoop.util.GenericOptionsParser;这个类负责解析命令行参数。
从代码的功能上,咱们已经对map reduce有了一个清晰的认识,那么wordcount程序具体是怎么执行的呢?
将文件file1.txt,file2.txt 上传到hdfs中的hdfsinput1文件夹里(上传的方式能够经过eclipse客户端,也能够经过Hadoop命令行),而后在eclipse上编写wordcount.java文件(也便是第一部分分析的源码)
因为测试用的文件较小,因此每一个文件为一个split,并将文件按行分割造成< key,value>,这一步由MapReduce框架自动完成,其中key值为该行首字母相对于文本文件首地址的偏移量。
将分割好的< key,value>对交给本身定义的map方法,输出新的< key,value>对。
获得map方法输出的< key,value>对后,进行Combine操做。这里Combine 执行的是Reduce的代码。
一样,在Reduce过程当中先对输入的数据进行排序,再交由自定义的reduce方法进行处理,获得新的< key,value>对,并做为WordCount的输出结果,输出结果存放在第一张图的lxnoutputssss文件夹下的part-r-00000里。