一、Map任务处理html
1.1 读取HDFS中的文件。每一行解析成一个<k,v>。每个键值对调用一次map函数。 <0,hello you> <10,hello me> java
1.2 覆盖map(),接收1.1产生的<k,v>,进行处理,转换为新的<k,v>输出。 <hello,1> <you,1> <hello,1> <me,1>apache
1.3 对1.2输出的<k,v>进行分区。默认分为一个区。Partitioner数组
1.4 溢写Split网络
1.5 对不一样分区中的数据进行排序(按照k)Sort。app
1.6 (可选)对分组后的数据进行归约。Combiner函数
combiner是一个可选的本地reducer,能够在map阶段聚合数据。combiner经过执行单个map范围内的聚合,减小经过网络传输的数据量。oop
例如,一个聚合的计数是每一个部分计数的总和,用户能够先将每一个中间结果取和,再将中间结果的和相加,从而获得最终结果。post
求平均值的时候不能用,由于123的平均是2,12平均再和3平均结果就不对了。Combiner应该用于那种Reduce的输入key/value与输出key/value类型彻底一致,且不影响最终结果的场景,好比累加,最大值等。url
1.7 合并Merge
二、Reduce任务处理
2.1 拉取数据Fetch
2.2 合并Merge
2.3 Reduce
三、WordCount代码
package mapreduce; import java.net.URI; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; public class WordCountApp { static final String INPUT_PATH = "hdfs://chaoren:9000/hello"; static final String OUT_PATH = "hdfs://chaoren:9000/out"; public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); FileSystem fileSystem = FileSystem.get(new URI(INPUT_PATH), conf); Path outPath = new Path(OUT_PATH); if (fileSystem.exists(outPath)) { fileSystem.delete(outPath, true); } Job job = new Job(conf, WordCountApp.class.getSimpleName()); // 指定读取的文件位于哪里 FileInputFormat.setInputPaths(job, INPUT_PATH); // 指定如何对输入的文件进行格式化,把输入文件每一行解析成键值对 //job.setInputFormatClass(TextInputFormat.class); // 指定自定义的map类 job.setMapperClass(MyMapper.class); // map输出的<k,v>类型。若是<k3,v3>的类型与<k2,v2>类型一致,则能够省略 //job.setOutputKeyClass(Text.class); //job.setOutputValueClass(LongWritable.class); // 指定自定义reduce类 job.setReducerClass(MyReducer.class); // 指定reduce的输出类型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(LongWritable.class); // 指定写出到哪里 FileOutputFormat.setOutputPath(job, outPath); // 指定输出文件的格式化类 //job.setOutputFormatClass(TextOutputFormat.class); // 分区 //job.setPartitionerClass(clz); // 排序、分组、归约 //job.setSortComparatorClass(clz); //job.setGroupingComparatorClass(clz); //job.setCombinerClass(clz); // 有一个reduce任务运行 //job.setNumReduceTasks(1); // 把job提交给jobtracker运行 job.waitForCompletion(true); } /** * * KEYIN 即K1 表示行的偏移量 * VALUEIN 即V1 表示行文本内容 * KEYOUT 即K2 表示行中出现的单词 * VALUEOUT 即V2 表示行中出现的单词的次数,固定值1 * */ static class MyMapper extends Mapper<LongWritable, Text, Text, LongWritable> { protected void map(LongWritable k1, Text v1, Context context) throws java.io.IOException, InterruptedException { String[] splited = v1.toString().split("\t"); for (String word : splited) { context.write(new Text(word), new LongWritable(1)); } }; } /** * KEYIN 即K2 表示行中出现的单词 * VALUEIN 即V2 表示出现的单词的次数 * KEYOUT 即K3 表示行中出现的不一样单词 * VALUEOUT 即V3 表示行中出现的不一样单词的总次数 */ static class MyReducer extends Reducer<Text, LongWritable, Text, LongWritable> { protected void reduce(Text k2, java.lang.Iterable<LongWritable> v2s, Context ctx) throws java.io.IOException, InterruptedException { long times = 0L; for (LongWritable count : v2s) { times += count.get(); } ctx.write(k2, new LongWritable(times)); }; } }