[MapReduce] WordCount 代码实例及具体执行过程说明

import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

public class WordCount {
        public WordCount(){}

        public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
                Configuration conf = new Configuration();
                // 调用格式 hadoop  jar  .jar   <input dir path 1 >   <input dir path 2 >  ...  <output dir path>
                // 输入可能有多个文件夹
                String[] otherArgs =  (new GenericOptionsParser(conf,args)).getRemainingArgs();

                if(otherArgs.length < 2) {
                        System.err.println("Usage: wordcount <in> [<in>...] <out>");
                        System.exit(2);
                }
                // 建立一个job 实例
                Job job = Job.getInstance(conf, "word count");
                job.setJarByClass(WordCount.class);

                // 指定继承了Mapper并实现了map方法的类
                job.setMapperClass(WordCount.TokenizerMapper.class);

                // 指定合并操做(实际上就是本地执行的Reduce操做), 可选
                job.setCombinerClass(WordCount.TokenizerReducer.class);

                //  指定分区操做 Map在根据分区类将不一样的key映射到相应的分区 默认根据key值,哈希分区
                // 须要该分区的Reduce 会根据JobTracker 对Map的监控 ,当Map结束后到相应的分区经过http取数据
                // 可选 ,默认哈希分区
                // job.setPartitionerClass(xxx.class);

                // 排序将根据Key数据类型的内部Comparator方法自动排序

                //  指定分组条件, 知足一样条件的key值将被分到同一个组,排序在最前的key值 将做为该组的key
                //  我的理解是reduce端拉取数据后的归并操做 从 <key,value1>, <key, value2 >... => <key, value-list>
                // 默认key 彻底相同为一组
                // job.setOutputKeyComparatorClass(XXX.class);

                // 指定Reduce操做
                job.setReducerClass(WordCount.TokenizerReducer.class);
                job.setOutputKeyClass(Text.class);
                job.setOutputValueClass(IntWritable.class);
                for(int i =0 ;i <otherArgs.length-1;i++){
                        FileInputFormat.addInputPath(job, new Path(otherArgs[i]));
                }
                FileOutputFormat.setOutputPath(job, new Path(otherArgs[otherArgs.length-1]));
                System.exit(job.waitForCompletion(true)?0:1);

        }

        public  static  class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable>{
                private final  IntWritable one = new IntWritable(1);
                private Text word = new Text();
                public TokenizerMapper(){}
                public void map(Object key, Text value, Mapper<Object, Text, Text, IntWritable>.Context context) throws IOException, InterruptedException {
                        String[] words = value.toString().split(" ");
                        for(String word_tmp : words){
                                this.word.set(word_tmp);
                                context.write(word, one);
                        }

                }
        }

        public static class TokenizerReducer extends Reducer<Text, IntWritable, Text, IntWritable>{
                private IntWritable result = new IntWritable(0);
                public TokenizerReducer(){}
                public void reduce(Text key, Iterable<IntWritable> values,  Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException {
                        int sum = 0;
                        for(IntWritable val: values){
                                sum+=val.get();
                        }
                        result.set(sum);
                        context.write(key, result);
                }

        }

}

Hadoop的shuffle过程就是从map端输出到reduce端输入之间的过程html

Map端

Map端输出要通过分区->排序->(合并->) 而后各分区Merge到一块儿 等待Reduce 取走各分区的数据java

(1)分区 Partition
默认分区是哈希分区,但存在数据倾斜问题,即有些key值的数据很是多,所以会影响Map效率,所以能够经过自定义分区类平衡数据分布。哪一个key到哪一个Reducer的分配过程,是由Partitioner规定的 。也就是说一个map其本地有着多个分区的数据,不一样分区的数据会被对应不一样的reducer拉取。apache

(2)排序 Sort
默认根据key的数据类型的内置Comparator排序,本身实现的数据类型须要自定义Comparator函数
(3)合并 Combine
map操做后会产生大量的<key,value>键值对,而且可能存在重复的键值对,而这会影响传递数据的效率(copy过程),所以 在map端能够经过本地的合并操做(能够看做本地的一次reduce), 合并一样key的键值对app

Reduce端

(1)Reduce端的copy过程
reduce端可能从n个map的结果中获取数据,而这些map的执行速度不尽相同,当其中一个map运行结束时,reduce就会从JobTracker中获取该信息。map运行结束后TaskTracker会获得消息,进而将消息汇报给JobTracker,reduce定时从JobTracker获取该信息,reduce端默认有5个数据复制线程从map端复制数据。
(2) 排序 分组(归并)
reduce端从不一样的map拉取的数据,数据确定须要通过再一次排序才能保证其有效性。因为分区策略的缘由,同一分区内的key值可能不一样或不知足咱们处理数据的需求, 所以咱们须要对数据进行分组,我我的理解为其实就是大数据技术原理课程讲的归并操做,即相同key值或知足相同条件的key值 合并为一个<key, value-list>,其中key值为排序的第一个key值函数

参考资料
Partitioner与自定义Partitioner
Shuffle过程那点事儿
hadoop的分区、分组与排序的理解oop

相关文章
相关标签/搜索