MapReduce将整个运行过程分为两个阶段: Map阶段和Reduce阶段java
Map阶段由必定数量的Map Task组成
输入数据格式解析: InputFormat
输入数据处理: Mapper
数据分组: Partitioner apache
Reduce阶段由必定数量的Reduce Task组成
数据远程拷贝
数据按照key排序
数据处理:Reducer
数据输出格式:OutputFormat编程
Map阶段
InputFormat(默认TextInputFormat)
Mapper
Combiner(local Reducer)
Partitioner
Reduce阶段
Reducer
OutputFormat(默认TextOutputFormat)
app
Java编程接口组成;
旧API:所在java包: org.apache.hadoop.mapred
新API:所在java包: org.apache.hadoop.mapreduce
新API具备更好的扩展性;框架
两种编程接口只是暴露给用户的形式不一样而已,内部执行引擎是同样的;ide
从Hadoop1.0.0开始,全部发行版均包含新旧两类API;函数
WordCount问题—map阶段oop
WordCount问题—reduce阶段spa
WordCount问题—mapper设计与实现设计
WordCount问题—reducer设计与实现
WordCount问题—数据流
package com.vip; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; /** * 单词统计 * @author huang * */ public class WordCountTest { public static class MyMapper extends Mapper<Object, Text, Text, IntWritable>{ //先来定义两个输出,k2,v2 Text k2 = new Text() ; IntWritable v2 = new IntWritable() ; /* * hello you * hello me * * 1.<k1,v2> 就是<0,hello you>,<10,hello me>这样得形式 * 经过map函数转换为 * 2.<k2,v2>--> <hello,1><you,1><hello,1><me,1> * */ @Override protected void map(Object key, Text value, Context context) throws IOException, InterruptedException { //对每一行得数据进行处理,拿到单词 String[] words = value.toString().split(" "); for (String word : words) { k2.set(word); //word就是每行得单词 v2.set(1); //每一个单词出现得次数就是1 context.write(k2, v2); //输出 } } } //3.对输出得全部得k2,v2进行分区partition //4.经过shuffle阶段以后结果是<hello,{1,1}><me,{1}><you,{1}> //3,4阶段都是hadoop框架自己帮咱们完成了 //reduce public static class MyReduce extends Reducer<Text, IntWritable, Text, IntWritable>{ @Override protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { //先来定义两个输出 IntWritable v3 = new IntWritable() ; int count = 0 ; for (IntWritable value : values) { count += value.get() ; } v3.set(count); //输出结果数据 context.write(key, v3); } } //咱们已经完成了主要得map和reduce的函数编写,把他们组装起来交给mapreduce去执行 public static void main(String[] args) throws Exception { //加载配置信息 Configuration conf = new Configuration() ; //设置任务 Job job = Job.getInstance(conf, "word count") ; job.setJarByClass(WordCountTest.class); //指定job要使用得mapper/reducer业务类 job.setMapperClass(MyMapper.class); job.setReducerClass(MyReduce.class); //指定最终输出得数据得kv类型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); //指定job得输入原始文件所在目录 FileInputFormat.addInputPath(job, new Path(args[0])); //指定job得输出结果所在目录 FileOutputFormat.setOutputPath(job, new Path(args[1])); System.exit(job.waitForCompletion(true)?0:1) ; } }
package com.vip; import java.io.IOException; import java.util.StringTokenizer; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; //求最大值 public class MapReduceCaseMax extends Configured implements Tool{ //编写map public static class MaxMapper extends Mapper<Object, Text, LongWritable, NullWritable>{ //定义一个最小值 long max = Long.MIN_VALUE ; @Override protected void map(Object key, Text value, Context context) throws IOException, InterruptedException { //切割字符串,默认分隔符空格,制表符 StringTokenizer st = new StringTokenizer(value.toString()) ; while(st.hasMoreTokens()){ //获取两个值 String num1 = st.nextToken() ; String num2 = st.nextToken() ; //转换类型 long n1 = Long.parseLong(num1) ; long n2 = Long.parseLong(num2) ; //判断比较 if(n1 > max){ max = n1 ; } if(n2 > max){ max = n2 ; } } } // @Override protected void cleanup(Context context) throws IOException, InterruptedException { context.write(new LongWritable(max), NullWritable.get()); } } @Override public int run(String[] args) throws Exception { /*设置任务和主类*/ Job job = Job.getInstance(getConf(), "MaxFiles") ; job.setJarByClass(MapReduceCaseMax.class); /*设置map方法的类*/ job.setMapperClass(MaxMapper.class); /*设置输出的key和value的类型*/ job.setOutputKeyClass(LongWritable.class); job.setOutputValueClass(NullWritable.class); /*设置输入输出参数*/ FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); /*提交做业到集群并等待任务完成*/ boolean isSuccess = job.waitForCompletion(true); return isSuccess ? 0 : 1 ; } public static void main(String[] args) throws Exception { int res = ToolRunner.run(new MapReduceCaseMax(), args) ; System.exit(res); } }