大数据(hadoop-mapreduce代码及编程模型讲解)

MapReduce编程模型

MapReduce将整个运行过程分为两个阶段: Map 阶段和Reduce阶段java

Map阶段由必定数量的Map Task组成
   输入数据格式解析: InputFormat
   输入数据处理: Mapper 
   数据分组: Partitioner      apache

Reduce阶段由必定数量的Reduce Task组成
  数据远程拷贝
  数据按照key排序
  数据处理:Reducer
  数据输出格式:OutputFormat编程

Map阶段
    InputFormat(默认TextInputFormat)
    Mapper
    Combiner(local Reducer)
    Partitioner
Reduce阶段
    Reducer
    OutputFormat(默认TextOutputFormat)
 app

Java编程接口

Java编程接口组成;
    旧API:所在java包: org.apache.hadoop.mapred
    新API:所在java包: org.apache.hadoop.mapreduce
    新API具备更好的扩展性;框架

    两种编程接口只是暴露给用户的形式不一样而已,内部执行引擎是同样的;ide

 

Java新旧API

从Hadoop1.0.0开始,全部发行版均包含新旧两类API;函数

实例1: WordCount问题

WordCount问题—map阶段oop

WordCount问题—reduce阶段spa

WordCount问题—mapper设计与实现设计

WordCount问题—reducer设计与实现

WordCount问题—数据流

示例代码

package com.vip;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

/**
 * 单词统计
 * @author huang
 *
 */
public class WordCountTest {

	public static class MyMapper extends Mapper<Object, Text, Text, IntWritable>{
		//先来定义两个输出,k2,v2
		Text k2 = new Text() ;
		IntWritable v2 = new IntWritable() ;
		
		/*
		 * hello you
		 * hello me
		 * 
		 * 1.<k1,v2> 就是<0,hello you>,<10,hello me>这样得形式
		 * 经过map函数转换为
		 * 2.<k2,v2>--> <hello,1><you,1><hello,1><me,1>
		 * */
		
		@Override
		protected void map(Object key, Text value, Context context)
				throws IOException, InterruptedException {
			//对每一行得数据进行处理,拿到单词
			String[] words = value.toString().split(" ");
			for (String word : words) {
				k2.set(word);			//word就是每行得单词
				v2.set(1);				//每一个单词出现得次数就是1
				context.write(k2, v2);	//输出
			}
		}
	}
	//3.对输出得全部得k2,v2进行分区partition
	//4.经过shuffle阶段以后结果是<hello,{1,1}><me,{1}><you,{1}>
	//3,4阶段都是hadoop框架自己帮咱们完成了
	//reduce
	public static class MyReduce extends Reducer<Text, IntWritable, Text, IntWritable>{
		
		@Override
		protected void reduce(Text key, Iterable<IntWritable> values,
				Context context) throws IOException, InterruptedException {
			//先来定义两个输出
			IntWritable v3 = new IntWritable() ;
			int count = 0 ;
			for (IntWritable value : values) {
				count += value.get() ;
			}
			v3.set(count);
			//输出结果数据
			context.write(key, v3);
		}
	}
	
	//咱们已经完成了主要得map和reduce的函数编写,把他们组装起来交给mapreduce去执行
	public static void main(String[] args) throws Exception {
		//加载配置信息
		Configuration conf = new Configuration() ;
		//设置任务
		Job job = Job.getInstance(conf, "word count") ;
		job.setJarByClass(WordCountTest.class);
		
		//指定job要使用得mapper/reducer业务类
		job.setMapperClass(MyMapper.class);
		job.setReducerClass(MyReduce.class);
		
		//指定最终输出得数据得kv类型
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(IntWritable.class);
		
		//指定job得输入原始文件所在目录
		FileInputFormat.addInputPath(job, new Path(args[0]));
		//指定job得输出结果所在目录
		FileOutputFormat.setOutputPath(job, new Path(args[1]));
		
		System.exit(job.waitForCompletion(true)?0:1) ;
	}	
}

 

package com.vip;

import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

//求最大值
public class MapReduceCaseMax extends Configured implements Tool{

	//编写map
	public static class MaxMapper extends Mapper<Object, Text, LongWritable, NullWritable>{
		//定义一个最小值
		long max = Long.MIN_VALUE ;
		@Override
		protected void map(Object key, Text value, Context context)
				throws IOException, InterruptedException {
			//切割字符串,默认分隔符空格,制表符
			StringTokenizer st = new StringTokenizer(value.toString()) ;
			while(st.hasMoreTokens()){
				//获取两个值
				String num1 = st.nextToken() ;
				String num2 = st.nextToken() ;
				//转换类型
				long n1 = Long.parseLong(num1) ;
				long n2 = Long.parseLong(num2) ;
				//判断比较
				if(n1 > max){
					max = n1 ;
				}
				if(n2 > max){
					max = n2 ;
				}
			}
		}
		
		//
		@Override
		protected void cleanup(Context context)
				throws IOException, InterruptedException {
			context.write(new LongWritable(max), NullWritable.get());
		}
	}
	
	@Override
	public int run(String[] args) throws Exception {
		/*设置任务和主类*/
		Job job = Job.getInstance(getConf(), "MaxFiles") ;
		job.setJarByClass(MapReduceCaseMax.class);
		
		/*设置map方法的类*/
		job.setMapperClass(MaxMapper.class);
		
		/*设置输出的key和value的类型*/
		job.setOutputKeyClass(LongWritable.class);
		job.setOutputValueClass(NullWritable.class);
		
		/*设置输入输出参数*/
		FileInputFormat.addInputPath(job, new Path(args[0]));
		FileOutputFormat.setOutputPath(job, new Path(args[1]));
		
		/*提交做业到集群并等待任务完成*/
		boolean isSuccess = job.waitForCompletion(true);
		
		return isSuccess ? 0 : 1 ;
	}
	
	public static void main(String[] args) throws Exception {
		int res = ToolRunner.run(new MapReduceCaseMax(), args) ;
		System.exit(res);
	}
}
相关文章
相关标签/搜索