MapReduce(一) mapreduce基础入门

1、mapreduce入门java

  一、什么是mapreduce     面试

首先让咱们来重温一下 hadoop 的四大组件:
HDFS:分布式存储系统
MapReduce:分布式计算系统
YARN: hadoop 的资源调度系统
Common: 以上三大组件的底层支撑组件,主要提供基础工具包和 RPC 框架等apache

Mapreduce 是一个分布式运算程序的编程框架,是用户开发“基于 hadoop 的数据分析 应用”的核心框架
Mapreduce 核心功能是将用户编写的业务逻辑代码和自带默认组件整合成一个完整的 分布式运算程序,并发运行在一个 hadoop 集群上
   二、为何须要mapreduce     编程

    为何须要 MapReduce?
     (1) 海量数据在单机上处理由于硬件资源限制,没法胜任
     (2) 而一旦将单机版程序扩展到集群来分布式运行,将极大增长程序的复杂度和开发难度
     (3) 引入 MapReduce 框架后,开发人员能够将绝大部分工做集中在业务逻辑的开发上,而将 分布式计算中的复杂性交由框架来处理缓存

可见在程序由单机版扩成分布式版时,会引入大量的复杂工做。为了提升开发效率,能够将 分布式程序中的公共功能封装成框架,让开发人员能够将精力集中于业务逻辑。
Hadoop 当中的 MapReduce 就是这样的一个分布式程序运算框架,它把大量分布式程序都会
涉及的到的内容都封装进了,让用户只用专一本身的业务逻辑代码的开发。 它对应以上问题
的总体结构以下:网络

      三、mapreduce程序运行实例并发

在 MapReduce 组件里, 官方给咱们提供了一些样例程序,其中很是有名的就是 wordcount 和 pi 程序。这些 MapReduce 程序的代码都在 hadoop-mapreduce-examples-2.6.4.jar 包里, 这 个 jar 包在 hadoop 安装目录下的/share/hadoop/mapreduce/目录里

app

下面咱们使用 hadoop 命令来试跑例子程序,看看运行效果
先看 MapReduce 程序求 pi 的程序:框架

那除了这两个程序之外,还有没有官方提供的其余程序呢,还有就是它们的源码在哪里呢?
咱们打开 mapreduce 的源码工程,里面有一个 hadoop-mapreduce-project 项目:jvm

    四、mapreduce示例编写及编码规范

上一步,咱们查看了 WordCount 这个 MapReduce 程序的源码编写,能够得出几点结论:
(1) 该程序有一个 main 方法,来启动任务的运行,其中 job 对象就存储了该程序运行的必要 信息,好比指定 Mapper 类和 Reducer 类
job.setMapperClass(TokenizerMapper.class);
job.setReducerClass(IntSumReducer.class);
(2) 该程序中的 TokenizerMapper 类继承了 Mapper 类
(3) 该程序中的 IntSumReducer 类继承了 Reducer 类

总结: MapReduce 程序的业务编码分为两个大部分,一部分配置程序的运行信息,一部分 编写该 MapReduce 程序的业务逻辑,而且业务逻辑的 map 阶段和 reduce 阶段的代码分别继 承 Mapper 类和 Reducer 类

  

(1) 用户编写的程序分红三个部分: Mapper, Reducer, Driver(提交运行 MR 程序的客户端)
(2) Mapper 的输入数据是 KV 对的形式( KV 的类型可自定义)
(3) Mapper 的输出数据是 KV 对的形式( KV 的类型可自定义)
(4) Mapper 中的业务逻辑写在 map()方法中
(5) map()方法( maptask 进程)对每个<K,V>调用一次
(6) Reducer 的输入数据类型对应 Mapper 的输出数据类型,也是 KV
(7) Reducer 的业务逻辑写在 reduce()方法中
(8) Reducetask 进程对每一组相同 k 的<k,v>组调用一次 reduce()方法
(9) 用户自定义的 Mapper 和 Reducer 都要继承各自的父类
(10) 整个程序须要一个 Drvier 来进行提交,提交的是一个描述了各类必要信息的 job 对象

WordCount 的业务逻辑: 
一、 maptask 阶段处理每一个数据分块的单词统计分析,思路是每遇到一个单词则把其转换成 一个 key-value 对,好比单词 hello,就转换成<’hello’,1>发送给 reducetask 去汇总
二、 reducetask 阶段将接受 maptask 的结果, 来作汇总计数

其次reduce

       五、mapreduce运行方式与debug

        运行方式:

 

 2、mapreduce的核心程序运行机制

     一、概述

一个完整的 mapreduce 程序在分布式运行时有两类实例进程:
 (1) MRAppMaster:负责整个程序的过程调度及状态协调  (该进程在yarn节点上)
 (2) Yarnchild:负责 map 阶段的整个数据处理流程         
 (3) Yarnchild:负责 reduce 阶段的整个数据处理流程
以上两个阶段 maptask 和 reducetask 的进程都是 yarnchild,并非说这 maptask 和 reducetask 就跑在同一个 yarnchild 进行里
(Yarnchild进程在运行该命令的节点上)

      二、mapreduce程序的运行流程(经典面试题)

(1) 一个 mr 程序启动的时候,最早启动的是 MRAppMaster, MRAppMaster 启动后根据本次 job 的描述信息,计算出须要的 maptask 实例数量,而后向集群申请机器启动相应数量的 maptask 进程
(2) maptask 进程启动以后,根据给定的数据切片(哪一个文件的哪一个偏移量范围)范围进行数 据处理,主体流程为:
    A、 利用客户指定的 inputformat 来获取 RecordReader 读取数据,造成输入 KV 对
    B、 将输入 KV 对传递给客户定义的 map()方法,作逻辑运算,并将 map()方法输出的 KV 对收 集到缓存
    C、 将缓存中的 KV 对按照 K 分区排序后不断溢写到磁盘文件 (超过缓存内存写到磁盘临时文件,最后都写到该文件,ruduce 获取该文件后,删除 )
(3) MRAppMaster 监控到全部 maptask 进程任务完成以后(真实状况是,某些 maptask 进 程处理完成后,就会开始启动 reducetask 去已完成的 maptask 处 fetch 数据),会根据客户指 定的参数启动相应数量的 reducetask 进程,并告知 reducetask 进程要处理的数据范围(数据
分区)
(4) Reducetask 进程启动以后,根据 MRAppMaster 告知的待处理数据所在位置,从若干台 maptask 运行所在机器上获取到若干个 maptask 输出结果文件,并在本地进行从新归并排序, 而后按照相同 key 的 KV 为一个组,调用客户定义的 reduce()方法进行逻辑运算,并收集运
算输出的结果 KV,而后调用客户指定的 outputformat 将结果数据输出到外部存储

      三、maptask并行度决定机制

maptask 的并行度决定 map 阶段的任务处理并发度,进而影响到整个 job 的处理速度 那么, mapTask 并行实例是否越多越好呢?其并行度又是如何决定呢?

一个 job 的 map 阶段并行度由客户端在提交 job 时决定, 客户端对 map 阶段并行度的规划
的基本逻辑为:
将待处理数据执行逻辑切片(即按照一个特定切片大小,将待处理数据划分红逻辑上的多 个 split),而后每个 split 分配一个 mapTask 并行实例处理
这段逻辑及造成的切片规划描述文件,是由 FileInputFormat实现类的 getSplits()方法完成的。
该方法返回的是 List<InputSplit>, InputSplit 封装了每个逻辑切片的信息,包括长度和位置  信息,而 getSplits()方法返回一组 InputSplit

       四、切片机制

     五、maptask并行度经验之谈

若是硬件配置为 2*12core + 64G,恰当的 map 并行度是大约每一个节点 20-100 个 map,最好 每一个 map 的执行时间至少一分钟。
     (1)若是 job 的每一个 map 或者 reduce task 的运行时间都只有 30-40 秒钟,那么就减小该 job 的 map 或者 reduce 数,每个 task(map|reduce)的 setup 和加入到调度器中进行调度,这个 中间的过程可能都要花费几秒钟,因此若是每一个 task 都很是快就跑完了,就会在 task 的开
始和结束的时候浪费太多的时间。
配置 task 的 JVM 重用能够改善该问题:
mapred.job.reuse.jvm.num.tasks,默认是 1,表示一个 JVM 上最多能够顺序执行的 task 数目(属于同一个 Job)是 1。也就是说一个 task 启一个 JVM。这个值能够在 mapred-site.xml 中进行更改, 当设置成多个,就意味着这多个 task 运行在同一个 JVM 上,但不是同时执行,
是排队顺序执行
   (2)若是 input 的文件很是的大,好比 1TB,能够考虑将 hdfs 上的每一个 blocksize 设大,好比 设成 256MB 或者 512MB
     六、reducetask并行度决定机制

 补充:

package com.ghgj.mr.wordcount;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;


public class WordCountMR {

	/**
	 * 该main方法是该mapreduce程序运行的入口,其中用一个Job类对象来管理程序运行时所须要的不少参数:
	 * 好比,指定用哪一个组件做为数据读取器、数据结果输出器
	 *     指定用哪一个类做为map阶段的业务逻辑类,哪一个类做为reduce阶段的业务逻辑类
	 *     指定wordcount job程序的jar包所在路径
	 *     ....
	 *     以及其余各类须要的参数
	 */
	public static void main(String[] args) throws Exception {
		// 指定hdfs相关的参数
		Configuration conf = new Configuration();
		conf.set("fs.defaultFS", "hdfs://hadoop02:9000");
		System.setProperty("HADOOP_USER_NAME", "hadoop");
		
//		conf.set("mapreduce.framework.name", "yarn");
//		conf.set("yarn.resourcemanager.hostname", "hadoop04");
		
		Job job = Job.getInstance(conf);
		
		// 设置jar包所在路径
		job.setJarByClass(WordCountMR.class);
		
		// 指定mapper类和reducer类
		job.setMapperClass(WordCountMapper.class);
		job.setReducerClass(WordCountReducer.class);
		
		// 指定maptask的输出类型
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(IntWritable.class);
		
		// 指定reducetask的输出类型
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(IntWritable.class);
		
//		Path inputPath = new Path("d:/wordcount/input");
//		Path outputPath = new Path("d:/wordcount/output");
		
		// 指定该mapreduce程序数据的输入和输出路径
		Path inputPath = new Path("/wordcount/input");
		Path outputPath = new Path("/wordcount/output");
		FileSystem fs = FileSystem.get(conf);
		if(fs.exists(outputPath)){
			fs.delete(outputPath, true);
		}
		FileInputFormat.setInputPaths(job, inputPath);
		FileOutputFormat.setOutputPath(job, outputPath);
		
//		job.submit();
		// 最后提交任务
		boolean waitForCompletion = job.waitForCompletion(true);
		System.exit(waitForCompletion?0:1);
	}
	
	/**
	 * Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> 
	 * 
	 * KEYIN 是指框架读取到的数据的key的类型,在默认的InputFormat下,读到的key是一行文本的起始偏移量,因此key的类型是Long 
	 * VALUEIN 是指框架读取到的数据的value的类型,在默认的InputFormat下,读到的value是一行文本的内容,因此value的类型是String
	 * KEYOUT 是指用户自定义逻辑方法返回的数据中key的类型,由用户业务逻辑决定,在此wordcount程序中,咱们输出的key是单词,因此是String
	 * VALUEOUT 是指用户自定义逻辑方法返回的数据中value的类型,由用户业务逻辑决定,在此wordcount程序中,咱们输出的value是单词的数量,因此是Integer
	 * 
	 * 可是,String ,Long等jdk中自带的数据类型,在序列化时,效率比较低,hadoop为了提升序列化效率,自定义了一套序列化框架
	 * 因此,在hadoop的程序中,若是该数据须要进行序列化(写磁盘,或者网络传输),就必定要用实现了hadoop序列化框架的数据类型
	 * 
	 * Long ----> LongWritable
	 * String ----> Text
	 * Integer ----> IntWritable
	 * Null ----> NullWritable
	 */
	static class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable>{
		@Override
		protected void map(LongWritable key, Text value,Context context)
				throws IOException, InterruptedException {
			
			String[] words = value.toString().split(" ");
			for(String word: words){
				context.write(new Text(word), new IntWritable(1));
			}
		}
	}
	
	/**
	 * 首先,和前面同样,Reducer类也有输入和输出,输入就是Map阶段的处理结果,输出就是Reduce最后的输出
	 * reducetask在调咱们写的reduce方法,reducetask应该收到了前一阶段(map阶段)中全部maptask输出的数据中的一部分
	 * (数据的key.hashcode%reducetask数==本reductask号),因此reducetaks的输入类型必须和maptask的输出类型同样
	 * 
	 * reducetask将这些收到kv数据拿来处理时,是这样调用咱们的reduce方法的:
	 * 先将本身收到的全部的kv对按照k分组(根据k是否相同)
	 * 将某一组kv中的第一个kv中的k传给reduce方法的key变量,把这一组kv中全部的v用一个迭代器传给reduce方法的变量values
	 */
	static class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable>{
		@Override
		protected void reduce(Text key, Iterable<IntWritable> values, Context context)
				throws IOException, InterruptedException {
			
			int sum = 0;
			for(IntWritable v: values){
				sum += v.get();
			}
			context.write(key, new IntWritable(sum));
		}
	}
}
相关文章
相关标签/搜索