创建一个包:cn.itcast.hadoop.mr.wordcount。java
三个类:WCmapper、WCReducer、WCRunner。apache
WCmapper数组
package cn.itcast.hadoop.mr.wordcount; import java.io.IOException; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; //包没有导进来,硬着头皮看 //4个泛型中,前两个是指定 mapper 输入数据的类型,keyin 是输入的 key 类型,valuein 是输入的 value 类型 //map 和 reduce 的数据输入输出都是以 key-value 形式封装的 //默认状况下,框架传递给咱们的 mapper 的输入数据中,key 是要处理的文本一行的起始偏移量,这一行的内容做为 value public class WCmapper extends Mapper<Long, String, String, Long> { // mapreduce框架每读一行数据就调用一次该方法 @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { // 具体业务逻辑就写在这个方法体中,并且咱们业务处理的数据已经被框架传递进来,在方法的参数中key-value // key 是这一行数据的其实偏移量,value 是这一行的文本内容 // 将这一行的内容转化为 String类型 String line = value.toString(); // 对这一行的文本按照特定分隔符切分 String[] words = StringUtils.split(line, " "); // 遍历这个单词数组输出为 kv 形式,k:单词;v:1 for (String word : words) { context.write(new Text(word), new LongWritable(1)); } } }
WCReducer缓存
package cn.itcast.hadoop.mr.wordcount; import java.io.IOException; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; public class WCReducer extends Reducer<Text, LongWritable, Text, LongWritable> { // 框架在 map 处理完成以后,将对全部kv缓存起来,进行分组,而后传递一个组<key,value{}>,调用一次reduce方法 // <hello,{1,1,1,1,1,1.....}> @Override protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException { long count = 0; // 遍历value的list,进行累加求和 for (LongWritable value : values) { count += value.get(); } // 输出一个单词的统计结果 context.write(key, new LongWritable(count)); } }
WCRunnerapp
package cn.itcast.hadoop.mr.wordcount; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; /** * 用来描述一个特定的类 好比,该做业使用哪一个类做为逻辑处理中的map,哪一个做为MapReduce 开能够指定该做为要处理的数据所在的路径 * 还能够指定该做业输出的结果放到哪一个路径 .... * * @author duanhaitao@itcast.cn * */ public class WCRunner { public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); Job wcjob = Job.getInstance(conf); // 设置整个job所用类在哪一个jar包 wcjob.setJarByClass(WCRunner.class); // 本 job 使用的 mapper 和reducer的类 wcjob.setMapperClass(WCMapper.class); wcjob.setReducerClass(WCReducer.class); // ָ指定 reduce 的输出数据kv类型 wcjob.setOutputKeyClass(Text.class); wcjob.setOutputValueClass(LongWritable.class); // 指定 mapper 的输出数据kv类型 wcjob.setMapOutputKeyClass(Text.class); wcjob.setMapOutputValueClass(LongWritable.class); // 原始数据存放路径 FileInputFormat.setInputPaths(wcjob, new Path("hdfs://weekend101:9000/wc/srcdata/")); // ָ指定要处理的输入数据存放路径 FileOutputFormat.setOutputPath(wcjob, new Path("hdfs://weekend101:9000/wc/output3/")); // 指定处理结果的输出数据存放路径 wcjob.waitForCompletion(true); } }