Hadoop之MapReduce理论篇02

1. ReduceTask工做机制

1.1. 设置ReduceTask 

ReduceTask 的并行度一样影响整个 job 的执行并发度和执行效率,但与 MapTask 的并发数由切片数决定不一样,ReduceTask 数量的决定是能够直接手动设置:java

// 默认值是1,手动设置为4
job.setNumReduceTasks(4);

1.2. 注意

(1) 若是数据分布不均匀,就有可能在 Reduce 阶段产生数据倾斜;
(2) ReduceTask 数量并非任意设置,还要考虑业务逻辑需求,有些状况下,须要计算全局汇总结果,就只能有 1 个 ReduceTask;
(3) 具体多少个 ReduceTask,须要根据集群性能而定;
(4) 若是分区数不是 1,可是 ReduceTask 为 1,是否执行分区过程。答案是:不执行分区过程。由于在 MapTask 的源码中,执行分区的前提是先判断 ReduceNum 个数是否大于 1。不大于 1 确定不执行。算法

1.3. 实验:测试ReduceTask多少合适

(1) 实验环境:1 个 master 节点,16 个 slave 节点: CPU:8GHZ , 内存: 2G,MapTask 数量为 16,测试数据量为 1G;
(2) 实验结论:apache

Reduce task服务器

1网络

5并发

10app

15框架

16分布式

20ide

25

30

45

60

总时间

892

146

110

92

88

100

128

101

145

104

1.4. ReduceTask工做机制

(1) Copy 阶段:ReduceTask 从各个 MapTask 上远程拷贝一片数据,并针对某一片数据,若是其大小超过必定阈值,则写到磁盘上,不然直接放到内存中;
(2) Merge 阶段:在远程拷贝数据的同时,ReduceTask 启动了两个后台线程对内存和磁盘上的文件进行合并,以防止内存使用过多或磁盘上文件过多;
(3) Sort 阶段:按照 MapReduce 语义,用户编写 reduce() 函数输入数据是按 key 进行汇集的一组数据。为了将 key 相同的数据聚在一块儿,Hadoop 采用了基于排序的策略。因为各个 MapTask 已经实现对本身的处理结果进行了局部排序,所以,ReduceTask 只需对全部数据进行一次归并排序便可;
(4) Reduce 阶段:reduce() 函数将计算结果写到 HDFS 上。

2. 自定义OutputFormat

2.1. 基本概念

要在一个 mapreduce 程序中根据数据的不一样输出两类结果到不一样目录,这类灵活的输出需求能够经过自定义 outputformat 来实现:
(1) 自定义 outputformat;
(2) 改写 recordwriter,具体改写输出数据的方法 write();

2.2. 案例实操

2.2.1. 需求

使用自定义 OutputFormat 实现过滤日志及自定义日志输出路径:
过滤输入的 log 日志中是否包含 bigdata
    (1)包含 bigdata 的日志输出到 e:/bigdata.log
    (2)不包含 bigdata 的日志输出到 e:/other.log

2.2.2. 代码实现

(1) 自定义一个 outputformat

package com.test.mapreduce.outputformat;
import java.io.IOException;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class FilterOutputFormat extends FileOutputFormat<Text, NullWritable>{

	@Override
	public RecordWriter<Text, NullWritable> getRecordWriter(TaskAttemptContext job)
			throws IOException, InterruptedException {

		// 建立一个RecordWriter
		return new FilterRecordWriter(job);
	}
}

(2) 具体的写数据 RecordWriter

package com.test.mapreduce.outputformat;
import java.io.IOException;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;

public class FilterRecordWriter extends RecordWriter<Text, NullWritable> {
	FSDataOutputStream bigdataOut = null;
	FSDataOutputStream otherOut = null;

	public FilterRecordWriter(TaskAttemptContext job) {
		// 1 获取文件系统
		FileSystem fs;

		try {
			fs = FileSystem.get(job.getConfiguration());

			// 2 建立输出文件路径
			Path bigdataPath = new Path("e:/bigdata.log");
			Path otherPath = new Path("e:/other.log");

			// 3 建立输出流
			bigdataOut = fs.create(bigdataPath);
			otherOut = fs.create(otherPath);
		} catch (IOException e) {
			e.printStackTrace();
		}
	}

	@Override
	public void write(Text key, NullWritable value) throws IOException, InterruptedException {

		// 判断是否包含“bigdata”输出到不一样文件
		if (key.toString().contains("bigdata")) {
			bigdataOut.write(key.toString().getBytes());
		} else {
			otherOut.write(key.toString().getBytes());
		}
	}

	@Override
	public void close(TaskAttemptContext context) throws IOException, InterruptedException {
		// 关闭资源
		if (bigdataOut!= null) {
			bigdataOut.close();
		}
		
		if (otherOut != null) {
			otherOut.close();
		}
	}
}

(3) 编写 FilterMapper

package com.test.mapreduce.outputformat;
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class FilterMapper extends Mapper<LongWritable, Text, Text, NullWritable>{
	
	Text k = new Text();
	
	@Override
	protected void map(LongWritable key, Text value, Context context)
			throws IOException, InterruptedException {
		// 1 获取一行
		String line = value.toString();
		
		k.set(line);
		
		// 3 写出
		context.write(k, NullWritable.get());
	}
}

(4) 编写 FilterReducer

package com.test.mapreduce.outputformat;
import java.io.IOException;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class FilterReducer extends Reducer<Text, NullWritable, Text, NullWritable> {

	@Override
	protected void reduce(Text key, Iterable<NullWritable> values, Context context)
			throws IOException, InterruptedException {

		String k = key.toString();
		k = k + "\r\n";

		context.write(new Text(k), NullWritable.get());
	}
}

(5) 编写 FilterDriver

package com.test.mapreduce.outputformat;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class FilterDriver {
	public static void main(String[] args) throws Exception {
		Configuration conf = new Configuration();

		Job job = Job.getInstance(conf);

		job.setJarByClass(FilterDriver.class);
		job.setMapperClass(FilterMapper.class);
		job.setReducerClass(FilterReducer.class);

		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(NullWritable.class);
		
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(NullWritable.class);

		// 要将自定义的输出格式组件设置到job中
		job.setOutputFormatClass(FilterOutputFormat.class);

		FileInputFormat.setInputPaths(job, new Path(args[0]));

		// 虽然咱们自定义了outputformat,可是由于咱们的outputformat继承自fileoutputformat
		// 而fileoutputformat要输出一个_SUCCESS文件,因此,在这还得指定一个输出目录
		FileOutputFormat.setOutputPath(job, new Path(args[1]));

		boolean result = job.waitForCompletion(true);
		System.exit(result ? 0 : 1);
	}
}

3. MapReduce数据压缩

3.1. 基本概念 

      压缩技术可以有效减小底层存储系统 (HDFS) 读写字节数。压缩提升了网络带宽和磁盘空间的效率。在 Hadood 下,尤为是数据规模很大和工做负载密集的状况下,使用数据压缩显得很是重要。在这种状况下, I/O 操做和网络数据传输要花大量的时间。还有,Shuffle 与 Merge 过程一样也面临着巨大的 I/O 压力。
      鉴于磁盘 I/O 和网络带宽是 Hadoop 的宝贵资源,数据压缩对于节省资源、最小化磁盘 I/O 和网络传输很是有帮助。不过,尽管压缩与解压操做的 CPU 开销不高,其性能的提高和资源的节省并不是没有代价。
      若是磁盘 I/O 和网络带宽影响了 MapReduce 做业性能,在任意 MapReduce 阶段启用压缩均可以改善端到端处理时间并减小 I/O 和网络流量。
      压缩 MapReduce 的一种优化策略:经过压缩编码对 Mapper 或者 Reducer 的输出进行压缩,以减小磁盘 IO,提升MR程序运行速度(但相应增长了 CPU 运算负担)
      注意:压缩特性运用得当能提升性能,但运用不当也可能下降性能
      基本原则:
(1) 运算密集型的 job,少用压缩
(2) IO 密集型的 job,多用压缩

3.2. MR 支持的压缩编码

压缩格式

工具

算法

文件扩展名

是否可切分

DEFAULT

DEFAULT

.deflate

Gzip

gzip

DEFAULT

.gz

bzip2

bzip2

bzip2

.bz2

LZO

lzop

LZO

.lzo

LZ4

LZ4

.lz4

Snappy

Snappy

.snappy

为了支持多种压缩/解压缩算法,Hadoop 引入了编码/解码类,以下表所示:

压缩格式

对应的编码/解码类

DEFLATE

org.apache.hadoop.io.compress.DefaultCodec

gzip

org.apache.hadoop.io.compress.GzipCodec

bzip2

org.apache.hadoop.io.compress.BZip2Codec

LZO

com.hadoop.compression.lzo.LzopCodec

LZ4

org.apache.hadoop.io.compress.Lz4Codec

Snappy

org.apache.hadoop.io.compress.SnappyCodec

压缩性能的比较:

压缩算法

原始文件大小

压缩文件大小

压缩速度

解压速度

gzip

8.3GB

1.8GB

17.5MB/s

58MB/s

bzip2

8.3GB

1.1GB

2.4MB/s

9.5MB/s

LZO-bset

8.3GB

2GB

4MB/s

60.6MB/s

LZO

8.3GB

2.9GB

49.3MB/s

74.6MB/s

3.3. 采用压缩的位置

(1) 输入压缩:
      在有大量数据并计划重复处理的状况下,应该考虑对输入进行压缩。然而,你无须显示指定使用的编解码方式。Hadoop 自动检查文件扩展名,若是扩展名可以匹配,就会用恰当的编解码方式对文件进行压缩和解压。不然,Hadoop 就不会使用任何编解码。
(2) 压缩 mapper 输出:
当 map 任务输出的中间数据量很大时,应考虑在此阶段采用压缩技术。这能显著改善内部数据 Shuffle 过程,而 Shuffle 过程在 Hadoop 处理过程当中是资源消耗最多的环节。若是发现数据量大形成网络传输缓慢,应该考虑使用压缩技术。可用于压缩 mapper 输出的快速编解码包括 LZO、LZ4 或者 Snappy。
      注:LZO 是供 Hadoop 压缩数据用的通用压缩编解码。其设计目标是达到与硬盘读取速度至关的压缩速度,所以速度是优先考虑的因素,而不是压缩率。与 gzip 编解码相比,它的压缩速度是 gzip 的 5 倍,而解压速度是 gzip 的 2 倍。同一个文件用 LZO 压缩后比用 gzip 压缩后大 50%,但比压缩前小 25%~50%。这对改善性能很是有利,map 阶段完成时间快4倍。
(3) 压缩 reducer 输出:
      在此阶段启用压缩技术可以减小要存储的数据量,所以下降所需的磁盘空间。当 mapreduce 做业造成做业链条时,由于第二个做业的输入也已压缩,因此启用压缩一样有效。

4. Yarn资源管理

4.1. 基本概念

Yarn 是一个资源调度平台,负责为运算程序提供服务器运算资源,至关于一个分布式的操做系统平台,而 mapreduce 等运算程序则至关于运行于操做系统之上的应用程序。

4.2. Yarn 的重要概念

(1) Yarn 并不清楚用户提交的程序的运行机制;
(2) Yarn 只提供运算资源的调度 (用户程序向 Yarn 申请资源,Yarn 就负责分配资源);
(3) Yarn 中的主管角色叫 ResourceManager;
(4) Yarn 中具体提供运算资源的角色叫NodeManager;
(5) 这样一来,Yarn 其实就与运行的用户程序彻底解耦,就意味着 Yarn 上能够运行各类类型的分布式运算程序 (mapreduce 只是其中的一种),好比 mapreduce、storm 程序,spark程序等;
(6) 因此,spark、storm 等运算框架均可以整合在 Yarn 上运行,只要他们各自的框架中有符合 Yarn 规范的资源请求机制便可;
(7) Yarn 就成为一个通用的资源调度平台,今后,企业中之前存在的各类运算集群均可以整合在一个物理集群上,提升资源利用率,方便数据共享。

4.3. Yarn 的工做机制

(0) Mr 程序提交到客户端所在的节点;
(1) YarnRunner 向 Resourcemanager 申请一个 application;
(2) RM 将该应用程序的资源路径返回给 YarnRunner;
(3) 该程序将运行所需资源提交到 HDFS 上;
(4) 程序资源提交完毕后,申请运行 MRAppMaster;
(5) RM 将用户的请求初始化成一个 Task;
(6) 其中一个 NodeManager 领取到 Task 任务;
(7) 该 NodeManager 建立容器 Container,并产生 MRAppmaster;
(8) Container 从 HDFS 上拷贝资源到本地;
(9) MRAppmaster 向 RM 申请运行 MapTask 容器;
(10) RM 将运行 MaptTask 任务分配给另外两个 NodeManager,另两个 NodeManager 分别领取任务并建立容器;
(11) MR 向两个接收到任务的 NodeManager 发送程序启动脚本,这两个 NodeManager 分别启动 MapTask,MapTask 对数据分区排序;
(12) MRAppmaster 向 RM 申请 2 个容器,运行 ReduceTask;
(13) ReduceTask 向 MapTask 获取相应分区的数据;
(14) 程序运行完毕后,MR 会向 RM 注销本身。
 

本文为原创文章,若是对你有一点点的帮助,别忘了点赞哦!比心!如需转载,请注明出处,谢谢!

相关文章
相关标签/搜索