Hadoop(17)-MapReduce框架原理-MapReduce流程,Shuffle机制,Partition分区

MapReduce工做流程

1.准备待处理文件html

2.job提交前生成一个处理规划apache

3.将切片信息job.split,配置信息job.xml和咱们本身写的jar包交给yarn网络

4.yarn根据切片规划计算出MapTask的数量app

(以一个MapTask为例)oop

5.Maptask调用inputFormat生成RecordReader,将本身处理的切片文件内容打散成K,V值性能

6.MapTask将打散好的K,V值交给Mapper,Mapper通过一系列的处理将KV值写出大数据

7.写出的KV值被outputCollector收集起来,写入一个在内存的环形缓冲区ui

8,9.当环形缓冲区被写入的空间等于80%时,会触发溢写.此时数据是在内存中,因此在溢写以前,会对数据进行排序,是一个二次排序的快排(先根据分区排序再根据key排序).而后将数据有序的写入到磁盘上.spa

缓冲区为何是环形的?这样作是为了能够在缓冲区的任何地方进行数据的写入.3d

当第一次溢写时,数据会从余下的20%空间中的中间位置,再分左右继续写入,也就是从第一次是从上往下写数据变成了从下往上写数据

 

10,11.当屡次溢写产生多个有序的文件后,会触发归并排序,将多个有序的文件合并成一个有序的大文件.当文件数>=10个时,会触发归并排序,取文件的一小部分放入内存的缓冲区,再生成一个小文件部分大小x文件数的缓冲区,逐个比较放入大文件缓冲区,依次比较下去,再将大文件缓冲写入到磁盘,归并结束后将大文件放在文件列表的末尾,继续重复此动做,直到合并成一个大文件.这次归并排序的时间复杂度要求较低.

12.当全部的MapTask执行完任务后,启动相应数量的ReduceTask,并告知每个ReduceTask应该处理的数据分区

13.ReduceTask将指定分区的文件下载到本地,若有多个分区文件的话,ReduceTask上将会有多个大文件,再一次归并排序,造成一个大文件.

14.15,若是有分组要求的话,ReduceTask会将数据一组一组的交给Reduce,处理完后准备将数据写出

16.Reduce调用output生成RecordWrite将数据写入到指定路径中

 

Shuffle机制

上图中,数据从Mapper写出来以后到数据进入到Reduce以前,这一阶段就叫作Shuffle

 

Shuffle时,会有三次排序,第一次是数据从环形缓冲区写入到磁盘时,会有一次快排,第二次是在MapTask中,将多个分区且内部有序的小文件归并成一个分区且内部有序的大文件,第三次是在ReduceTask中,从多个MapTask中获取指定分区的大文件,再进行一个归并排序,合并成一个大文件.

以WordCount为例,试想一下,在第一次从环形缓冲区写入到磁盘时,排好序的数据为(w1,1),(w1,1),(w1,1),(w2,1),(w2,1),(w3,1),这样的数据会增长网络传输量,因此在这里可使用Combiner进行数据合并.最后造成的数据是(w1,3),(w2,2),(w3,1),后续会详细讲解~

Partition分区

将Mapper想象成一个水池,数据是池里的水.默认分一个区,只有一根水管.若是只有一个ReduceTask,则水会所有顺着惟一的水管流入到ReduceTask中.若是此时有3根水管,则水会被分红三股水流流入到3个ReduceTask中,并且哪些水进哪一个水管,并不受咱们主观控制,也就是数据处理速度加快了~~Partition分区就决定了分几根水管.试想一下,若是有4根水管,末端只有3个ReduceTask,那么有一股水流会丢失.也就是形成数据丢失,程序会报错.若是只有2根水管,那么则有一个ReduceTask无事可作,最后生成的是一个空文件,浪费资源

因此,通常来讲,有几个ReduceTask就要分几个区,至于partition和ReduceTask设置为几,要看集群性能,数据集,业务,经验等等~

对应流程图上,也就是从环形缓冲区写入到磁盘时,会分区

 

 

collector出现了,除了将key,value收集到缓冲区中以外,还收集了partition分区

 

 

key.hashCode() & Integer.MAX_VALUE,保证取余前的数为正数

好比,numReduceTasks = 3, 一个数n对3取余,结果会有0,1,2三种可能,也就是分三个区,再一次印证了要 reduceTask number = partition number 

默认分区是根据key的hashcode和reduceTasks的个数取模获得的,用户没法控制哪一个key存储到哪一个分区上

案例演练

以12小章的统计流量案例为例,大数据-Hadoop生态(12)-Hadoop序列化和源码追踪

将手机号13六、13七、13八、139开头都分别放到一个独立的4个文件中,其余开头的放到一个文件中

 自定义Partition类

package com.atguigu.partitioner;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;

public class MyPartitioner extends Partitioner<Text, FlowBean> {
    public int getPartition(Text text, FlowBean flowBean, int numPartitions) {
        //1. 截取手机前三位
        String start = text.toString().substring(0, 3);

        //2. 按照手机号前三位返回分区号
        switch (start) {
            case "136":
                return 0;
            case "137":
                return 1;
            case "138":
                return 2;
            case "139":
                return 3;
            default:
                return 4;
        }


    }
}

Driver类的main()中增长如下代码

job.setPartitionerClass(MyPartitioner.class);

job.setNumReduceTasks(5);

输出结果,5个文件 

若是job.setNumReduceTasks(10),会生成10个文件,其中5个是空文件

 若是job.setNumReduceTasks(2),程序会直接执行失败报异常

若是job.setNumReduceTasks(1),程序会运行成功,由于若是numReduceTasks=1时,根本就不会执行分区的过程

 

 若是是如下状况,也会执行失败.MapReduce会认为你分了41个区,因此分区号必须从0开始,逐一累加.

job.setNumReduceTasks(5)

switch (start) { case "136": return 0; case "137": return 1; case "138": return 2; case "139": return 3; default: return 40; }

相关文章
相关标签/搜索