大数据技术栈在当下已是比较成熟的了,Hadoop 做为大数据存储的基石,其重要程度不言而喻,做为一个想从 java 后端转向大数据开发的程序员来讲,打好 Hadoop 基础,就至关于夯实建造房屋的地基,本文以上图结构为基本,旨在帮助你们快速了解 Hadoop 运行机制。css
HDFS 篇java
HDFS就是你们熟知的分布式存储的文件系统,它包括 3 个组件,结构以下图:node
NameNode 至关于 Master 节点,它是管理者;nginx
DataNode 是 Slave,是执行实际操做的节点;程序员
SecondryNameNode并非 NameNode的热备,它是辅助 NameNode工做的。算法
下面咱们来从图示的几个方面来进行详细阐述,如图:数据库
HDFS 写数据流程编程
客户端向 NameNode 发送上传文件请求,附带上传路径 ;后端
NameNode 检查路径是否存在若是已经存在则报出“路径已存在”错误,反正通知客户端能够上传文件;缓存
客户端将文件根据拆分后,请求上传第一个块(0-128M),向NameNode 查找可上传的 DataNode 信息;
NameNode 返回可用的 DataNode 信息,按副本个数返回可用DataNode 个数;
客户端收到 DataNode 信息后,请求与其中一个 DataNode 创建 block 传输通道,创建成功后,第二个 DataNode 与此 DataNode 创建 block 通道,第三个 DataNode 与第二个 DataNode 创建 block 通道,以此类推,DataNode 之间逐级创建通道完成
通道创建完成后,从最后一个 DataNode 向倒数第二个 DataNode 应答通道创建成功,倒数第二个再向它前一个DataNode 应答通道创建成功,直到第一个 DataNode 应答客户端通道创建成功;
客户端开始传输数据,将block 拆成一个个的 package 传输,当传到第一个 DataNode 时,将 package 放到内存中,边将package 存储,边将内存中的数据传输到下一个 DataNode,以此方法传到最后一个DataNode。
这样作的好处是:加快传输速度并减小其余DataNode等待时间提升传输效率。
当第一个块传输完成时,再请求传输第二个block 重复 3-8 步。
HDFS 读取数据流程
客户端向 NameNode 请求下载 xxx 路径下的文件;
NameNode 返回目标文件的元数据信息,即文件在 DataNode 上存储位置,因为一个文件是拆分存储在不一样的DataNode上,因此客户端须要与不一样DataNode 进行交互;
客户端请求读取第一个 DataNode数据;
第一个 DataNode 数据向客户端传输数据 block_1
第一个DataNode 数据读取完成后,向第二个 DataNode请求读取数据,第二个DataNode 想客户端传输文件 block_2
重复以上步骤,直到读取文件完成
NameNode 与 SecondryNameNode 工做机制
Hadoop在第一次启动时须要格式化 NameNode,NameNode在格式化后会建立 fsimage和 edits ,若是不是第一次启动则直接加载编辑日志和镜像文件;
客户端对元数据进行增删改的请求;
NameNode 记录操做日志,更新滚动日志;
NameNode 在内存中对数据进行增删改查;
SecondaryNameNode 询问 NameNode 是否须要checkpoint,直接带回 NameNode 是否检查结果;
SecondaryNameNode 请求执行 checkpoint;
NameNode 滚动正在写的 edits 日志;
将滚动前的编辑日志和镜像文件拷贝到 SecondaryNameNode;
SecondaryNameNode 加载编辑日志和镜像文件到内存,并合并;
生成新的镜像文件 fsimage.chkpoint;
拷贝f simage.chkpoint 到 NameNode;
NameNode 将 fsimage.chkpoint 从新命名成 fsimage。
fsimage 与 edits 以及其余文件介绍:
fsimage文件:HDFS 文件系统元数据的一个永久性的检查点,其中包含HDFS文件系统的全部目录和文件 idnode 的序列化信息。
edits文件:存放HDFS文件系统的全部更新操做的路径,文件系统客户端执行的全部写操做首先会被记录到edits文件中。
seen_txid文件保存的是一个数字,就是最后一个edits_的数字
每次 NameNod e启动的时候都会将 fsimage 文件读入内存,并将 edits里面的更新操做,保证内存中的元数据信息是最新的、同步的,能够当作 NameNode 启动的时候就将 fsimage 和 edits 文件进行了合并。
checkpoint 检查点设置
SecondaryNameNode 每隔一小时执行一次检查,当操做次数达到1百万时,执行一次检查。具体配置在 hdfs-default.xml 中。
DataNode 工做机制
DataNode 启动后向 NameNode 注册,经过后,周期性(1小时)的向NameNode 上报全部的块信息。
心跳是每 3 秒一次,心跳返回结果带有 NameNode 给该 DataNode 的命令如复制块数据到另外一台机器,或删除某个数据块。若是超过 10 分钟没有收到某个 DataNode 的心跳,则认为该节点不可用,不会立刻断定为该节点死亡。
HDFS 默认的超时时长为 10 分钟+30 秒。若是定义超时时间为 timeout,则超时时长的计算公式为:
timeout = 2 * dfs.namenode.heartbeat.recheck-interval + 10 * dfs.heartbeat.interval
而默认的 dfs.namenode.heartbeat.recheck-interval 大小为 5 分钟,dfs.heartbeat.interval 默认为 3 秒。
安全模式
NameNode 启动时,首先将映像文件(fsimage)载入内存,并执行编辑日志(edits)中的各项操做,这是须要必定时间的,此时 NameNode 运行在安全模式,即 NameNode 的文件系统对于客户端来讲是只读的。
系统中的数据块的位置并非由 NameNode 维护的,而是以块列表的形式存储在 DataNode 中。
在系统的正常操做期间,NameNode 会在内存中保留全部块位置的映射信息。在安全模式下,各个 DataNode 会向 NameNode 发送最新的块列表信息,NameNode 了解到足够多的块位置信息以后,便可高效运行文件系统。
MapReduce篇
mapreduce 是一种并行计算的框架,它是 hadoop 并行计算的基础,下面咱们从图示的几个方面来开始 MR 的介绍
MR 思想
分布式的运算程序每每须要分红至少 2 个阶段。
第一个阶段的 maptask 并发实例,彻底并行运行,互不相干。
第二个阶段的 reduce task 并发实例互不相干,可是他们的数据依赖于上一个阶段的全部 maptask 并发实例的输出。
MapReduce 编程模型只能包含一个 map 阶段和一个 reduce 阶段,若是用户的业务逻辑很是复杂,那就只能多个 mapreduce 程序,串行运行。
MR 编程规范与大致流程
Mapper 阶段
用户自定义的 Mapper 要继承本身的父类
Mapper 的输入数据是 KV 对的形式(KV 的类型可自定义)
Mapper 中的业务逻辑写在 map() 方法中
Mapper 的输出数据是 KV 对的形式(KV 的类型可自定义)
map() 方法(maptask 进程)对每个<K,V>调用一次
Reducer 阶段
用户自定义的 Reducer 要继承本身的父类
Reducer 的输入数据类型对应 Mapper 的输出数据类型,也是 KV
Reducer 的业务逻辑写在 reduce() 方法中
Reducetask 进程对每一组相同 k 的<k,v>组调用一次 reduce() 方法
Driver阶段
至关于 yarn 集群的客户端,用于提交咱们整个程序到 yarn 集群,提交的是封装了 mapreduce 程序相关运行参数的 job 对象
分片
分片机制:
简单地按照文件的内容长度进行切片
切片时不考虑数据集总体,而是逐个针对每个文件单独切片
切片大小,默认等于 block 大小,
切片判断,当剩下的文件大小大于切片大小的 1.1 倍时才进行切片
切片大小: Math.max(minSize, Math.min(maxSize,blockSize));
切片主要由这几个值来运算决定:
mapreduce.input.fileinputformat.split.minsize = 1 默认值为 1
mapreduce.input.fileinputformat.split.maxsize = Long.MAXValue 默认值 Long.MAXValue
所以,默认状况下,切片大小 = blocksize。
输入流 FileInputFormat 与 自定义:
经常使用的输入流 TextFileInputFormat(按行读入),combinerTextInputFormat(读入时合并小分件)等
自定义输入流须要继承 FileInputFormat ,重写 createRecordReader 方法。
MapTask机制
MapTask 的并行度是由 切片的个数决定的,有多少个切片就会执行多少个 MapTask。 大体流程以下图:
Read阶段:MapTask 经过用户编写的 RecordReader,从输入 切片中解析出一个个 key/value。
Map 阶段:该节点主要是将解析出的 key/value 交给用户编写map()函数处理,并产生一系列新的 key/value。
Collect收集阶段:在用户编写 map( )函数中,当数据处理完成后,通常会调用 OutputCollector.collect() 输出结果。在该函数内部,它会将生成的 key/value 分区(调用 Partitioner),并写入一个环形内存缓冲区中。
Spill阶段:即“溢写”,当环形缓冲区满后,MapReduce 会将数据写到本地磁盘上,生成一个临时文件。须要注意的是,将数据写入本地磁盘以前,先要对数据进行一次本地排序,并在必要时对数据进行合并、压缩等操做。
溢写阶段详情:
步骤1:利用快速排序算法对缓存区内的数据进行排序,排序方式是,先按照分区编号 partition 进行排序,而后按照 key 进行排序。这样,通过排序后,数据以分区为单位汇集在一块儿,且同一分区内全部数据按照 key 有序。
步骤2:按照分区编号由小到大依次将每一个分区中的数据写入任务工做目录下的临时文件 output/spillN.out(N 表示当前溢写次数)中。若是用户设置了 Combiner,则写入文件以前,对每一个分区中的数据进行一次汇集操做。
步骤3:将分区数据的元信息写到内存索引数据结构 SpillRecord中,其中每一个分区的元信息包括在临时文件中的偏移量、压缩前数据大小和压缩后数据大小。若是当前内存索引大小超过 1MB,则将内存索引写到文件 output/spillN.out.index 中。
5 .Combine阶段:当全部数据处理完成后,MapTask对全部临时文件 进行一次合并,以确保最终只会生成一个数据文件。
在进行文件合并过程当中,MapTask 以分区为单位进行合并。对于某个分区,它将采用多轮递归合并的方式。每轮合并 io.sort.factor(默认100)个文件,并将产生的文件从新加入待合并列表中,对文件排序后,重复以上过程,直到最终获得一个大文件。
让每一个 MapTask 最终只生成一个数据文件,可避免同时打开大量文件和同时读取大量小文件产生的随机读取带来的开销。
Shuffle 机制
Mapreduce 确保每一个 reducer 的输入都是按 key 排序的。系统执行排序的过程(即将 mapper 输出做为输入传给 reducer)称为 shuffle,如图下图所示。
shuffle 在map 阶段主要是进行 分区 、排序 、合并,在reduce 阶段主要进行 合并、排序。
Partition 分区
默认分区是根据 key 的 hashCode 和 Integer的最大值作与运算后对reduceTasks个数取模获得的。
return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;
使用默认规则咱们无法控制哪一个key存储到哪一个分区因此一般须要自定义分区规则,自定义分区的步骤很简单:
自定义类继承 Partitioner,重写 getPartition() 方法
在 job 驱动中,设置自定义partitioner,job.setPartitionClass(自定义类)
自定义 partition 后,要根据自定义 partitioner 的逻辑设置相应数量的 reduce task
此时要注意 分区个数与 reduceTask 个数的关系:
若是 reduceTask 的数量> 分区数,则会多产生几个空的输出文件;
若是 1< reduceTask 的数量< 分区数,则有一部分分区数据无处安放,程序会报错;
若是 reduceTask 的数量 =1,则无论 mapTask 端输出多少个分区文件,最终结果都交给这一个 reduceTask,最终也就只会产生一个结果文件,实际上这种状况是不会进行分区的。
排序
shuffle 过程当中按排序时机来看一共进行了4次排序:
缓冲区在写入磁盘前按天然序先对文件进行快速排序
在 combiner 阶段对文件进行合并排序,归并排序法
reduceTask 将 MapTask 的输出文件拷贝后进行归并排序
进入 reduce 方法前进行自定义排序,使用 GroupingComparato r组件
GroupingComparator 经常使用的例子是 自定义订单 comparator,根据订单id排序,大体步骤以下:
建立订单对象继承 WritableComparator ,重写 compare方法 返回1,-1,0
自定义GroupingComparator对象继承WritableComparator,重写compare方法,传入订单对象作比较
job.setGroupingComparatorClass(自定义GroupingComparator)
若有须要再根据订单id,自定义分区
合并
combiner 是 MR 程序中 Mapper 和 Reducer 以外的一种组件,它主要用场景是在溢写阶段在写入到磁盘前对文件先进行一次合并操做,须要注意的是应用 combiner 的前提是不能影响最终的业务逻辑
其实 combiner 组件的父类就是 Reducer,combiner 和 reducer 的区别在于运行的位置:
Combiner是在每个 maptask 所在的节点运行;它的意义就是对每个 maptask 的输出进行局部汇总,以减少网络传输量。
Reducer 是接收全局全部 Mapper 的输出结果;
combiner 的使用:
自定义 combiner 类继承 reducer ,重写 reduce 方法
在 Driver 中 job.setCombinerClass(自定义 combiner 类)
ReduceTask机制
reducetask的并行度一样影响整个 job 的执行并发度和执行效率,但与 maptask 的并发数由切片数决定不一样,Reducetask 数量的决定是能够直接手动设置:
//默认值是1,手动设置为 5job.setNumReduceTasks(5);
reduce 个数若是是 0,则没有 reduce 过程输出文件数和 map 数一致
reduce 个数若是是 1,则输出一个文件,不会有分区过程
大体流程以下图所示:
Copy 阶段:ReduceTask 从各个 MapTask 上远程拷贝一片数据,并针对某一片数据,若是其大小超过必定阈值,则写到磁盘上,不然直接放到内存中。
Merge 阶段:在远程拷贝数据的同时,ReduceTask 启动了两个后台线程对内存和磁盘上的文件进行合并,以防止内存使用过多或磁盘上文件过多。
Sort 阶段:按照 MapReduce 语义,用户编写 reduce() 函数输入数据是按 key 进行汇集的一组数据。为了将 key 相同的数据聚在一块儿,Hadoop 采用了基于排序的策略。因为各个 MapTask 已经实现对本身的处理结果进行了局部排序,所以,ReduceTask 只需对全部数据进行一次归并排序便可。
Reduce 阶段:reduce() 函数将计算结果写到 HDFS 上。(默认是HDFS,具体写到哪要看输出流)
输出流 FileOutputFormat 与 自定义:
经常使用的输入接口 TextOutputFormat(每条记录写成行,字符串输出),SequenceFileOutputFormat(顺序文件输出须要做为后序MR输入)等。
自定义输出流须要继承FileOutputFormat ,重写 getRecordWriter方法,返回自定义RecordWriter。
join
提到 join 最多的是使用 map join,缘由在于使用 reduce join 会形成 reduce 端压力大,容易产生数据倾斜,
使用map join的两种状况:
一张小表一张大表状况,小表驱动大表
在map端缓存多张表,提早处理业务逻辑,在map端多处理业务,减小reduce端压力,尽量减小数据倾斜
MR 总结
因为 MR 部份内容过多而且复杂,咱们用一张图来简单总结下核心知识点,具体描述在上文均有提到:
压缩
对于 Hadoop 来讲压缩特性运用得当能提升性能,但运用不当也可能下降性能。
有以下两个基本原则:
运算密集型的job,少用压缩
IO密集型的job,多用压缩
YARN
YARN 架构组成
YARN主要由ResourceManager、NodeManager、ApplicationMaster和Container等组件构成,
ResourceManager
处理客户端请求
监控 NodeManager
启动或监控 ApplicationMaster
资源的分配与调度
NodeManager
管理节点资源
处理 ResourceManager 请求
处理 ApplicationMaster 请求
ApplicationMaster
数据切分
为应用程序申请资源并分配给内部的任务
任务监控与容错
Container
Container是 YARN 中资源抽象,封装了某个节点上的资源 内存 CPU 磁盘 网络
YARN做业机制
YARN 的做业机制是比较宏观的,它反映了一个 MR 任务从提交到集群到完成的整个周期,如图:
做业提交全过程详解
做业提交
1.client 调用 job.waitForCompletion 方法,向整个集群提交MapReduce 做业;
2.client 向 RM 申请一个做业 id;
3.RM 给 client 返回该 job 资源的提交路径和做业 id;
4.client 提交 jar 包、切片信息和配置文件到指定的资源提交路径;
5.client 提交完资源后,向 RM 申请运行 MrAppMaster;
做业初始化
6.当 RM 收到 client 的请求后,将该 job 添加到容量调度器中;
7.某一个空闲的 NM 领取到该 job;
8.该 NM 建立 Container,并产生 MRAppmaster;
9.下载 client 提交的资源到本地;
任务分配
10.MrAppMaster 向 RM 申请运行多个 maptask 任务资源;
11.RM 将运行 maptask 任务分配给另外两个 NodeManager,另两个 NodeManager 分别领取任务并建立容器;
任务运行
12.MR 向两个接收到任务的 NodeManager 发送程序启动脚本,这两个 NodeManager 分别启动 maptask,maptask 对数据分区排序;
13.MrAppMaster 等待全部 maptask 运行完毕后,向 RM 申请容器,运行 reducetask ;
14.reduce task 向 maptask 获取相应分区的数据;
15.程序运行完毕后,MR 会向 RM 申请注销本身;
进度和状态更新
YARN 中的任务将其进度和状态返回给应用管理器, 客户端每秒向应用管理器请求进度更新, 展现给用户。
做业完成
除了向应用管理器请求做业进度外, 客户端每 5 分钟都会经过调用 waitForCompletion() 来检查做业是否完成。
做业完成以后, 应用管理器和 container 会清理工做状态。做业的信息会被做业历史服务器存储以备以后用户核查。
调度机制
hadoop 资源调度器分为 队列 FIFO(先进先出调度器)、容量调度器、公平调度器,默认使用的是容量调度器。
容量调度器
组成:
多个队列组成,每一个队列可配置必定资源,队列采用FIFO
对同一用户提交做业所需资源进行限定
算法:
1. 纵向排序- 先计算 每一个队列中正在运行的任务数与其应该分得的计算资源的比值,按比值从小到大给队列排序
2. 横向排序- 按照做业优先级和提交顺序,同时考虑用户资源限制和内存限制对该队列内任务排序
3. 不一样队列中排在第一位的任务同时执行
4. 队列中排在第一位的任务优先执行,其余任务依次执行
公平调度器
按缺额排序,缺额大者优先 ,缺额是指 job 在理想状况下须要的计算资源与实际获得的计算资源的差值
组成:
多个队列组成,每一个队列中每一个做业公平共享队列资源
算法:
1.同一个队列中,job的缺额越大,越先得到的资源,优先运行
2.做业按缺额的高低来前后执行:
a.同一队列中可能有多个任务同时执行
b.不一样对列中可能有多个任务同时执行
企业优化
最后咱们来谈谈hadoop在企业中经常使用的优化措施:
输入端小文件处理
1. 合并小文件,对小文件进行存档 ,将多个小文件打包成一个 HAR 文件,命令以下:
hadoop archive -archiveName zoo.har -p / foo / bar (打包路径) -r 3 (副本数)/ outputdir(打包目录)
2. 自定义 InputFormat 将小文件存储成 SequenceFile
3. 使用 CombineTextInputFormat 来是做为输入,解决输入端大量小文件场景,
4. 对于大量小文件 Job,能够开启 JVM 重用
开启前一个 map 运行一个 jvm,开启后在一个 map 在 jvm 上运行完毕后,jvm 继续运行其余 map。
具体设置:mapreduce.job.jvm.numtasks值在10-20之间。
Map阶段
1. 增大环形缓冲区大小,由100M扩大到200M
mapreduce.task.io.sort.mb(shuffle的环形缓冲区大小,默认100m)
2. 增大缓冲区溢写比例,由80%扩大到90%
mapreduce.map.sort.spill.percent (环形缓冲区溢出的阈值,默认80%)
3. 减小溢写文件的 merge 次数
4. 不影响实际业务的前提下,使用 combiner 进行合并,减小I/O
reduce阶段
1. 合理设置map和reduce数,不能太多特不能太少。太少会致使task等待,延长处理时间;太多致使map reduce任务间竞争,形成处理处理
2. 规避使用 reduce 由于reduce在用于链接数据集时会产生大量的网络消耗
3. 设置 map reduce共存,调整slowstart.completedmaps 参数,使map 运行到必定程度后,reduce 也开始运行,减小reduce 的等待时间
4 增长每一个reduce去map中拿数据的并行数
5 集群性能能够的前提下,增大reduce端存储数据内存的大小
IO传输
1. 采用数据压缩的方式,减小网络IO的的时间。安装Snappy和LZOP压缩编码器,
2. 使用SequenceFile二进制文件
总体优化,经常使用调优参数表
1. MapTask默认内存大小为1G,能够增长MapTask内存大小为4-5G
2. ReduceTask默认内存大小为1G,能够增长ReduceTask内存大小为4-5G
3. 能够增长MapTask的cpu核数,增长ReduceTask的cpu核数
4. 增长每一个container的cpu核数和内存大小
5. 调整每一个Map Task和Reduce Task最大重试次数
具体参数参考以下表:
资源相关参数
如下参数是在用户本身的mr应用程序中配置就能够生效(mapred-default.xml)
配置参数 |
参数说明 |
mapreduce.map.memory.mb |
一个Map Task可以使用的资源上限(单位:MB),默认为1024。若是Map Task实际使用的资源量超过该值,则会被强制杀死。 |
mapreduce.reduce.memory.mb |
一个Reduce Task可以使用的资源上限(单位:MB),默认为1024。若是Reduce Task实际使用的资源量超过该值,则会被强制杀死。 |
mapreduce.map.cpu.vcores |
每一个Map task可以使用的最多cpu core数目,默认值: 1 |
mapreduce.reduce.cpu.vcores |
每一个Reduce task可以使用的最多cpu core数目,默认值: 1 |
mapreduce.reduce.shuffle.parallelcopies |
每一个reduce去map中拿数据的并行数。默认值是5 |
mapreduce.reduce.shuffle.merge.percent |
buffer中的数据达到多少比例开始写入磁盘。默认值0.66 |
mapreduce.reduce.shuffle.input.buffer.percent |
buffer大小占reduce可用内存的比例。默认值0.7 |
mapreduce.reduce.input.buffer.percent |
指定多少比例的内存用来存放buffer中的数据,默认值是0.0 |
如下参数应该在yarn启动以前就配置在服务器的配置文件中才能生效(yarn-default.xml)
配置参数 |
参数说明 |
yarn.scheduler.minimum-allocation-mb |
给应用程序container分配的最小内存,默认值:1024 |
yarn.scheduler.maximum-allocation-mb |
给应用程序container分配的最大内存,默认值:8192 |
yarn.scheduler.minimum-allocation-vcores |
每一个container申请的最小CPU核数,默认值:1 |
yarn.scheduler.maximum-allocation-vcores |
每一个container申请的最大CPU核数,默认值:32 |
yarn.nodemanager.resource.memory-mb |
给containers分配的最大物理内存,默认值:8192 |
如下参数是 shuffle 性能优化的关键参数,应在yarn启动以前就配置好(mapred-default.xml)
配置参数 |
参数说明 |
mapreduce.task.io.sort.mb |
shuffle的环形缓冲区大小,默认100m |
mapreduce.map.sort.spill.percent |
环形缓冲区溢出的阈值,默认80% |
容错相关参数(mapreduce性能优化)
配置参数 |
参数说明 |
mapreduce.map.maxattempts |
每一个Map Task最大重试次数,一旦重试参数超过该值,则认为Map Task运行失败,默认值:4。 |
mapreduce.reduce.maxattempts |
每一个Reduce Task最大重试次数,一旦重试参数超过该值,则认为Map Task运行失败,默认值:4。 |
mapreduce.task.timeout |
Task超时时间,常常须要设置的一个参数,该参数表达的意思为:若是一个task在必定时间内没有任何进入,即不会读取新的数据,也没有输出数据,则认为该task处于block状态,多是卡住了,也许永远会卡住,为了防止由于用户程序永远block住不退出,则强制设置了一个该超时时间(单位毫秒),默认是600000。若是你的程序对每条输入数据的处理时间过长(好比会访问数据库,经过网络拉取数据等),建议将该参数调大,该参数太小常出现的错误提示是“AttemptID:attempt_14267829456721_123456_m_000224_0 Timed out after 300 secsContainer killed by the ApplicationMaster.”。 |
参考资料:尚硅谷 hadoop 学习笔记
关注一下,我写的就更来劲儿啦