import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.GenericOptionsParser; public class WordCount { public WordCount(){} public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { Configuration conf = new Configuration(); // 调用格式 hadoop jar .jar <input dir path 1 > <input dir path 2 > ... <output dir path> // 输入可能有多个文件夹 String[] otherArgs = (new GenericOptionsParser(conf,args)).getRemainingArgs(); if(otherArgs.length < 2) { System.err.println("Usage: wordcount <in> [<in>...] <out>"); System.exit(2); } // 建立一个job 实例 Job job = Job.getInstance(conf, "word count"); job.setJarByClass(WordCount.class); // 指定继承了Mapper并实现了map方法的类 job.setMapperClass(WordCount.TokenizerMapper.class); // 指定合并操做(实际上就是本地执行的Reduce操做), 可选 job.setCombinerClass(WordCount.TokenizerReducer.class); // 指定分区操做 Map在根据分区类将不一样的key映射到相应的分区 默认根据key值,哈希分区 // 须要该分区的Reduce 会根据JobTracker 对Map的监控 ,当Map结束后到相应的分区经过http取数据 // 可选 ,默认哈希分区 // job.setPartitionerClass(xxx.class); // 排序将根据Key数据类型的内部Comparator方法自动排序 // 指定分组条件, 知足一样条件的key值将被分到同一个组,排序在最前的key值 将做为该组的key // 我的理解是reduce端拉取数据后的归并操做 从 <key,value1>, <key, value2 >... => <key, value-list> // 默认key 彻底相同为一组 // job.setOutputKeyComparatorClass(XXX.class); // 指定Reduce操做 job.setReducerClass(WordCount.TokenizerReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); for(int i =0 ;i <otherArgs.length-1;i++){ FileInputFormat.addInputPath(job, new Path(otherArgs[i])); } FileOutputFormat.setOutputPath(job, new Path(otherArgs[otherArgs.length-1])); System.exit(job.waitForCompletion(true)?0:1); } public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable>{ private final IntWritable one = new IntWritable(1); private Text word = new Text(); public TokenizerMapper(){} public void map(Object key, Text value, Mapper<Object, Text, Text, IntWritable>.Context context) throws IOException, InterruptedException { String[] words = value.toString().split(" "); for(String word_tmp : words){ this.word.set(word_tmp); context.write(word, one); } } } public static class TokenizerReducer extends Reducer<Text, IntWritable, Text, IntWritable>{ private IntWritable result = new IntWritable(0); public TokenizerReducer(){} public void reduce(Text key, Iterable<IntWritable> values, Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException { int sum = 0; for(IntWritable val: values){ sum+=val.get(); } result.set(sum); context.write(key, result); } } }
Hadoop的shuffle过程就是从map端输出到reduce端输入之间的过程html
Map端输出要通过分区->排序->(合并->) 而后各分区Merge到一块儿 等待Reduce 取走各分区的数据java
(1)分区 Partition
默认分区是哈希分区,但存在数据倾斜问题,即有些key值的数据很是多,所以会影响Map效率,所以能够经过自定义分区类平衡数据分布。哪一个key到哪一个Reducer的分配过程,是由Partitioner规定的 。也就是说一个map其本地有着多个分区的数据,不一样分区的数据会被对应不一样的reducer拉取。apache
(2)排序 Sort
默认根据key的数据类型的内置Comparator排序,本身实现的数据类型须要自定义Comparator函数
(3)合并 Combine
map操做后会产生大量的<key,value>键值对,而且可能存在重复的键值对,而这会影响传递数据的效率(copy过程),所以 在map端能够经过本地的合并操做(能够看做本地的一次reduce), 合并一样key的键值对app
(1)Reduce端的copy过程
reduce端可能从n个map的结果中获取数据,而这些map的执行速度不尽相同,当其中一个map运行结束时,reduce就会从JobTracker中获取该信息。map运行结束后TaskTracker会获得消息,进而将消息汇报给JobTracker,reduce定时从JobTracker获取该信息,reduce端默认有5个数据复制线程从map端复制数据。
(2) 排序 分组(归并)
reduce端从不一样的map拉取的数据,数据确定须要通过再一次排序才能保证其有效性。因为分区策略的缘由,同一分区内的key值可能不一样或不知足咱们处理数据的需求, 所以咱们须要对数据进行分组,我我的理解为其实就是大数据技术原理课程讲的归并操做,即相同key值或知足相同条件的key值 合并为一个<key, value-list>,其中key值为排序的第一个key值函数
参考资料
Partitioner与自定义Partitioner
Shuffle过程那点事儿
hadoop的分区、分组与排序的理解oop