ReduceTask 的并行度一样影响整个 job 的执行并发度和执行效率,但与 MapTask 的并发数由切片数决定不一样,ReduceTask 数量的决定是能够直接手动设置:java
// 默认值是1,手动设置为4 job.setNumReduceTasks(4);
(1) 若是数据分布不均匀,就有可能在 Reduce 阶段产生数据倾斜;
(2) ReduceTask 数量并非任意设置,还要考虑业务逻辑需求,有些状况下,须要计算全局汇总结果,就只能有 1 个 ReduceTask;
(3) 具体多少个 ReduceTask,须要根据集群性能而定;
(4) 若是分区数不是 1,可是 ReduceTask 为 1,是否执行分区过程。答案是:不执行分区过程。由于在 MapTask 的源码中,执行分区的前提是先判断 ReduceNum 个数是否大于 1。不大于 1 确定不执行。算法
(1) 实验环境:1 个 master 节点,16 个 slave 节点: CPU:8GHZ , 内存: 2G,MapTask 数量为 16,测试数据量为 1G;
(2) 实验结论:apache
Reduce task服务器 |
1网络 |
5并发 |
10app |
15框架 |
16分布式 |
20ide |
25 |
30 |
45 |
60 |
总时间 |
892 |
146 |
110 |
92 |
88 |
100 |
128 |
101 |
145 |
104 |
(1) Copy 阶段:ReduceTask 从各个 MapTask 上远程拷贝一片数据,并针对某一片数据,若是其大小超过必定阈值,则写到磁盘上,不然直接放到内存中;
(2) Merge 阶段:在远程拷贝数据的同时,ReduceTask 启动了两个后台线程对内存和磁盘上的文件进行合并,以防止内存使用过多或磁盘上文件过多;
(3) Sort 阶段:按照 MapReduce 语义,用户编写 reduce() 函数输入数据是按 key 进行汇集的一组数据。为了将 key 相同的数据聚在一块儿,Hadoop 采用了基于排序的策略。因为各个 MapTask 已经实现对本身的处理结果进行了局部排序,所以,ReduceTask 只需对全部数据进行一次归并排序便可;
(4) Reduce 阶段:reduce() 函数将计算结果写到 HDFS 上。
要在一个 mapreduce 程序中根据数据的不一样输出两类结果到不一样目录,这类灵活的输出需求能够经过自定义 outputformat 来实现:
(1) 自定义 outputformat;
(2) 改写 recordwriter,具体改写输出数据的方法 write();
使用自定义 OutputFormat 实现过滤日志及自定义日志输出路径:
过滤输入的 log 日志中是否包含 bigdata
(1)包含 bigdata 的日志输出到 e:/bigdata.log
(2)不包含 bigdata 的日志输出到 e:/other.log
(1) 自定义一个 outputformat
package com.test.mapreduce.outputformat; import java.io.IOException; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class FilterOutputFormat extends FileOutputFormat<Text, NullWritable>{ @Override public RecordWriter<Text, NullWritable> getRecordWriter(TaskAttemptContext job) throws IOException, InterruptedException { // 建立一个RecordWriter return new FilterRecordWriter(job); } }
(2) 具体的写数据 RecordWriter
package com.test.mapreduce.outputformat; import java.io.IOException; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.hadoop.mapreduce.TaskAttemptContext; public class FilterRecordWriter extends RecordWriter<Text, NullWritable> { FSDataOutputStream bigdataOut = null; FSDataOutputStream otherOut = null; public FilterRecordWriter(TaskAttemptContext job) { // 1 获取文件系统 FileSystem fs; try { fs = FileSystem.get(job.getConfiguration()); // 2 建立输出文件路径 Path bigdataPath = new Path("e:/bigdata.log"); Path otherPath = new Path("e:/other.log"); // 3 建立输出流 bigdataOut = fs.create(bigdataPath); otherOut = fs.create(otherPath); } catch (IOException e) { e.printStackTrace(); } } @Override public void write(Text key, NullWritable value) throws IOException, InterruptedException { // 判断是否包含“bigdata”输出到不一样文件 if (key.toString().contains("bigdata")) { bigdataOut.write(key.toString().getBytes()); } else { otherOut.write(key.toString().getBytes()); } } @Override public void close(TaskAttemptContext context) throws IOException, InterruptedException { // 关闭资源 if (bigdataOut!= null) { bigdataOut.close(); } if (otherOut != null) { otherOut.close(); } } }
(3) 编写 FilterMapper
package com.test.mapreduce.outputformat; import java.io.IOException; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; public class FilterMapper extends Mapper<LongWritable, Text, Text, NullWritable>{ Text k = new Text(); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { // 1 获取一行 String line = value.toString(); k.set(line); // 3 写出 context.write(k, NullWritable.get()); } }
(4) 编写 FilterReducer
package com.test.mapreduce.outputformat; import java.io.IOException; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; public class FilterReducer extends Reducer<Text, NullWritable, Text, NullWritable> { @Override protected void reduce(Text key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException { String k = key.toString(); k = k + "\r\n"; context.write(new Text(k), NullWritable.get()); } }
(5) 编写 FilterDriver
package com.test.mapreduce.outputformat; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class FilterDriver { public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = Job.getInstance(conf); job.setJarByClass(FilterDriver.class); job.setMapperClass(FilterMapper.class); job.setReducerClass(FilterReducer.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(NullWritable.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(NullWritable.class); // 要将自定义的输出格式组件设置到job中 job.setOutputFormatClass(FilterOutputFormat.class); FileInputFormat.setInputPaths(job, new Path(args[0])); // 虽然咱们自定义了outputformat,可是由于咱们的outputformat继承自fileoutputformat // 而fileoutputformat要输出一个_SUCCESS文件,因此,在这还得指定一个输出目录 FileOutputFormat.setOutputPath(job, new Path(args[1])); boolean result = job.waitForCompletion(true); System.exit(result ? 0 : 1); } }
压缩技术可以有效减小底层存储系统 (HDFS) 读写字节数。压缩提升了网络带宽和磁盘空间的效率。在 Hadood 下,尤为是数据规模很大和工做负载密集的状况下,使用数据压缩显得很是重要。在这种状况下, I/O 操做和网络数据传输要花大量的时间。还有,Shuffle 与 Merge 过程一样也面临着巨大的 I/O 压力。
鉴于磁盘 I/O 和网络带宽是 Hadoop 的宝贵资源,数据压缩对于节省资源、最小化磁盘 I/O 和网络传输很是有帮助。不过,尽管压缩与解压操做的 CPU 开销不高,其性能的提高和资源的节省并不是没有代价。
若是磁盘 I/O 和网络带宽影响了 MapReduce 做业性能,在任意 MapReduce 阶段启用压缩均可以改善端到端处理时间并减小 I/O 和网络流量。
压缩 MapReduce 的一种优化策略:经过压缩编码对 Mapper 或者 Reducer 的输出进行压缩,以减小磁盘 IO,提升MR程序运行速度(但相应增长了 CPU 运算负担)
注意:压缩特性运用得当能提升性能,但运用不当也可能下降性能
基本原则:
(1) 运算密集型的 job,少用压缩
(2) IO 密集型的 job,多用压缩
压缩格式 |
工具 |
算法 |
文件扩展名 |
是否可切分 |
DEFAULT |
无 |
DEFAULT |
.deflate |
否 |
Gzip |
gzip |
DEFAULT |
.gz |
否 |
bzip2 |
bzip2 |
bzip2 |
.bz2 |
是 |
LZO |
lzop |
LZO |
.lzo |
否 |
LZ4 |
无 |
LZ4 |
.lz4 |
否 |
Snappy |
无 |
Snappy |
.snappy |
否 |
为了支持多种压缩/解压缩算法,Hadoop 引入了编码/解码类,以下表所示:
压缩格式 |
对应的编码/解码类 |
DEFLATE |
org.apache.hadoop.io.compress.DefaultCodec |
gzip |
org.apache.hadoop.io.compress.GzipCodec |
bzip2 |
org.apache.hadoop.io.compress.BZip2Codec |
LZO |
com.hadoop.compression.lzo.LzopCodec |
LZ4 |
org.apache.hadoop.io.compress.Lz4Codec |
Snappy |
org.apache.hadoop.io.compress.SnappyCodec |
压缩性能的比较:
压缩算法 |
原始文件大小 |
压缩文件大小 |
压缩速度 |
解压速度 |
gzip |
8.3GB |
1.8GB |
17.5MB/s |
58MB/s |
bzip2 |
8.3GB |
1.1GB |
2.4MB/s |
9.5MB/s |
LZO-bset |
8.3GB |
2GB |
4MB/s |
60.6MB/s |
LZO |
8.3GB |
2.9GB |
49.3MB/s |
74.6MB/s |
(1) 输入压缩:
在有大量数据并计划重复处理的状况下,应该考虑对输入进行压缩。然而,你无须显示指定使用的编解码方式。Hadoop 自动检查文件扩展名,若是扩展名可以匹配,就会用恰当的编解码方式对文件进行压缩和解压。不然,Hadoop 就不会使用任何编解码。
(2) 压缩 mapper 输出:
当 map 任务输出的中间数据量很大时,应考虑在此阶段采用压缩技术。这能显著改善内部数据 Shuffle 过程,而 Shuffle 过程在 Hadoop 处理过程当中是资源消耗最多的环节。若是发现数据量大形成网络传输缓慢,应该考虑使用压缩技术。可用于压缩 mapper 输出的快速编解码包括 LZO、LZ4 或者 Snappy。
注:LZO 是供 Hadoop 压缩数据用的通用压缩编解码。其设计目标是达到与硬盘读取速度至关的压缩速度,所以速度是优先考虑的因素,而不是压缩率。与 gzip 编解码相比,它的压缩速度是 gzip 的 5 倍,而解压速度是 gzip 的 2 倍。同一个文件用 LZO 压缩后比用 gzip 压缩后大 50%,但比压缩前小 25%~50%。这对改善性能很是有利,map 阶段完成时间快4倍。
(3) 压缩 reducer 输出:
在此阶段启用压缩技术可以减小要存储的数据量,所以下降所需的磁盘空间。当 mapreduce 做业造成做业链条时,由于第二个做业的输入也已压缩,因此启用压缩一样有效。
Yarn 是一个资源调度平台,负责为运算程序提供服务器运算资源,至关于一个分布式的操做系统平台,而 mapreduce 等运算程序则至关于运行于操做系统之上的应用程序。
(1) Yarn 并不清楚用户提交的程序的运行机制;
(2) Yarn 只提供运算资源的调度 (用户程序向 Yarn 申请资源,Yarn 就负责分配资源);
(3) Yarn 中的主管角色叫 ResourceManager;
(4) Yarn 中具体提供运算资源的角色叫NodeManager;
(5) 这样一来,Yarn 其实就与运行的用户程序彻底解耦,就意味着 Yarn 上能够运行各类类型的分布式运算程序 (mapreduce 只是其中的一种),好比 mapreduce、storm 程序,spark程序等;
(6) 因此,spark、storm 等运算框架均可以整合在 Yarn 上运行,只要他们各自的框架中有符合 Yarn 规范的资源请求机制便可;
(7) Yarn 就成为一个通用的资源调度平台,今后,企业中之前存在的各类运算集群均可以整合在一个物理集群上,提升资源利用率,方便数据共享。
(0) Mr 程序提交到客户端所在的节点;
(1) YarnRunner 向 Resourcemanager 申请一个 application;
(2) RM 将该应用程序的资源路径返回给 YarnRunner;
(3) 该程序将运行所需资源提交到 HDFS 上;
(4) 程序资源提交完毕后,申请运行 MRAppMaster;
(5) RM 将用户的请求初始化成一个 Task;
(6) 其中一个 NodeManager 领取到 Task 任务;
(7) 该 NodeManager 建立容器 Container,并产生 MRAppmaster;
(8) Container 从 HDFS 上拷贝资源到本地;
(9) MRAppmaster 向 RM 申请运行 MapTask 容器;
(10) RM 将运行 MaptTask 任务分配给另外两个 NodeManager,另两个 NodeManager 分别领取任务并建立容器;
(11) MR 向两个接收到任务的 NodeManager 发送程序启动脚本,这两个 NodeManager 分别启动 MapTask,MapTask 对数据分区排序;
(12) MRAppmaster 向 RM 申请 2 个容器,运行 ReduceTask;
(13) ReduceTask 向 MapTask 获取相应分区的数据;
(14) 程序运行完毕后,MR 会向 RM 注销本身。