【Hadoop】- MapReduce 代码工做过程

Hadoop MapReduce基础案例java

MapReduce:Hadoop分布式并行计算框架web

思想:分治法apache

通俗解释

工厂给客户交付货物1000吨,卡车A运量50吨,须要顺序20次,若是平时客户不忙20次运输所需的时间客户可以接受,忽然市场竞争激烈,工厂为了提供失效,每次运输从单台卡车运输提升到20台卡车运输,这样整个运量1次就搞定,Map Reduce相似,就是将一些廉价机器组成一个集群,每一个节点都处理整个做业的一部分,最后进行汇总,从而快速提升大数据的处理能力app

MapReduce工做流程图

这里写图片描述

Maprreduce工做流程

HDFS提供数据源,通过splitting将数据切割成数据片,表示为K-V数据模型做为Mapping的输入,Mapping对数据进行进一步的处理,并造成客户须要的数据K-V数据模型,Shuffing对Mapping产生的K-V数据模型根据key进行排序汇总,而后将数据传给Reduce做业,reduce对key-ValueList进行相应的处理,最终汇总出最终的结果框架


基础案例:统计文本中出现的单词及个数wordcount分布式

① :启动Hadoop集群:start-all.shide

这里写图片描述

② :准备测试的数据源函数

这里写图片描述

HDFS的存储目录工具

这里写图片描述

③ :建立Mapper:数据分割oop

package mapreduce;

import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class WcMapper extends Mapper<LongWritable,Text,Text, IntWritable>{

	@Override
	public void map(LongWritable key,Text value,Context context) throws IOException, InterruptedException{
		
		String line = value.toString();
		StringTokenizer st = new StringTokenizer(line);  //分词工具类
		while(st.hasMoreTokens()){
			String word = st.nextToken();  //获取单词信息
			
			 /**
			  * 将数据写入shufer过程,用于reduce的结果合并
			  * new Text(word):做为key
			  * new IntWritable(1):做为value,reduce会对其进行合并
			  */
			context.write(new Text(word), new IntWritable(1));  
		}
	}
}

Mapper泛型参数解释:

  • LongWritable,Text,Text, IntWritable

  • LongWritable:Hadoop序列化的类型,可理解成Long的包装类,这里表示splitting Data的编号,hadoop默认按照”行”进行数据切割,这里能够近似理解为第N行

  • Text:表示输入数据的格式:这里文件中存放的是字符串,使用Text

  • Text:表示map函数输出的key的类型,所以是单词计数,因此是单词为key,Text类型

  • IntWritable:表示map函数的输出的value的类型,这里表示单词出现的次数

  • context.write(new Text(word), new IntWritable(1)); 表示每次出现一个单词就向shuffling输出数据:单词及次数1,也就是说每次单词出现一次就输出1

④ :建立Reducer:数据汇总

package mapreduce;

import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class WcReducer extends Reducer<Text, IntWritable, Text, IntWritable>{

	@Override
	protected void reduce(Text text, Iterable<IntWritable> iterable,
			Reducer<Text, IntWritable, Text, IntWritable>.Context context)
			throws IOException, InterruptedException {

		int sum = 0;
		for(IntWritable intWritable:iterable){
			sum = sum+intWritable.get();  
		}
		context.write(new Text(text), new IntWritable(sum));
	}
}

Reducer泛型参数解释:

  • Text, IntWritable, Text, IntWritable

说明Mapper传递参数和Reducer的输出参数是字符串和整型,对应单词和单词次数

reduce方法形参解释

  • Text:表示Mapper传递的Key的值
  • Iterable:表示Mapper传递的Key对应的值列表

例如:Map传递的值多是片断
zhangsan 1
zhangsan 1
Map事后进入shuffling过程,shuffling会将Mapper的数据进行汇总,变成相似形式 zhangsan{1,1},也就是key-valueList数据模型,而后将其传递给reduce函数

int sum = 0;
for(IntWritable intWritable:iterable){
	sum = sum+intWritable.get();  
}
context.write(new Text(text), new IntWritable(sum));

对valueList集合中的值进行汇总,获得单个单词的累计出现次数

⑤ :建立Job测试类:Job做为Mapreduce做业的启动类,主要是将做业交给JobTracker,JobTacker经过调度Hadoop集群中的taskTracker进行做业处理

package mapreduce;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

/**
 *运行mapreduce做业:mapreduce打成jar包
 *控制台运行:hadoop jar jar包路径  JobRun类便可运行 
 *
 *MapReduce web访问端口:50030
 */
public class WcJobRun {

	public static void main(String[] args) {
		
		Configuration conf = new Configuration();
		conf.set("mapred.job.tracker", "localhost:9001");
		try{
		Job job = new Job();
		job.setJarByClass(WcJobRun.class); //设置启动做业类
		job.setMapperClass(WcMapper.class); //设置Map类
		job.setReducerClass(WcReducer.class);
		job.setMapOutputKeyClass(Text.class); //设置mapper输出的key类型
		job.setMapOutputValueClass(IntWritable.class); //设置mapper输出的value类型
		
		job.setNumReduceTasks(1); //设置Reduce Task的数量
		
		//设置mapreduce的输入和输出目录
		FileInputFormat.addInputPath(job, new Path("/user/squirrel/in"));
		FileOutputFormat.setOutputPath(job, new Path("/user/squirrel/out") );
		
		//等待mapreduce整个过程完成
		System.exit(job.waitForCompletion(true)?0:1);
		}catch(Exception e){
			e.printStackTrace();
		}
	}
}

⑥ :MapReduce做业测试运行

Eclipse将相关类打包为jar文件,经过”hadoop jar 包名 job类名”运行mapreduce做业 命令:hadoop jar wc.jar mapreduce.WcJobRun

控制台日志:

这里写图片描述

web监控mapreduce做业过程:http://192.168.174.135:50030

这里写图片描述

这里写图片描述

这里写图片描述

查看MapReduce处理以后的文件:注意存储在HDFS文件系统上

这里写图片描述

注意:

MapReduce结果的存放目录以前不可以在HDFS文件存在,不然抛出异常,若是想提升MapReduce做业的灵活性彻底能够将Job类的HDFS输入和输出路径引用为main方法的形参args[0]、args[1],经过”hadoop jar wc.jar mapreduce.WcJobRun HDFS输入路径 HDFS输出路径”处理便可

相关文章
相关标签/搜索