Hadoop之MapReduce基础

一。MapReduce概念

  Mapreduce是一个分布式运算程序的编程框架,是用户开发“基于hadoop的数据分析应用”的核心框架;前端

  Mapreduce核心功能是将用户编写的业务逻辑代码和自带默认组件整合成一个完整的分布式运算程序,并发运行在一个hadoop集群上。java

1.1 为何要MapReduce

  1)海量数据在单机上处理由于硬件资源限制,没法胜任node

  2)而一旦将单机版程序扩展到集群来分布式运行,将极大增长程序的复杂度和开发难度算法

  3)引入mapreduce框架后,开发人员能够将绝大部分工做集中在业务逻辑的开发上,而将分布式计算中的复杂性交由框架来处理。数据库

1.2 MapReduce核心思想

1)分布式的运算程序每每须要分红至少2个阶段apache

2)第一个阶段的maptask并发实例,彻底并行运行,互不相干编程

3)第二个阶段的reduce task并发实例互不相干,可是他们的数据依赖于上一个阶段的全部maptask并发实例的输出windows

4)MapReduce编程模型只能包含一个map阶段和一个reduce阶段,若是用户的业务逻辑很是复杂,那就只能多个mapreduce程序,串行运行缓存

1.3 MapReduce进程

一个完整的mapreduce程序在分布式运行时有三类实例进程:安全

1)MrAppMaster:负责整个程序的过程调度及状态协调

2)MapTask:负责map阶段的整个数据处理流程

3)ReduceTask:负责reduce阶段的整个数据处理流程

1.4 MapReduce编程规范

用户编写的程序分红三个部分:Mapper,Reducer,Driver(提交运行mr程序的客户端)

1)Mapper阶段

       (1)用户自定义的Mapper要继承本身的父类

       (2)Mapper的输入数据是KV对的形式(KV的类型可自定义)

       (3)Mapper中的业务逻辑写在map()方法中

       (4)Mapper的输出数据是KV对的形式(KV的类型可自定义)

       (5)map()方法(maptask进程)对每个<K,V>调用一次

2)Reducer阶段

       (1)用户自定义的Reducer要继承本身的父类

       (2)Reducer的输入数据类型对应Mapper的输出数据类型,也是KV

       (3)Reducer的业务逻辑写在reduce()方法中

       (4)Reducetask进程对每一组相同k的<k,v>组调用一次reduce()方法

3)Driver阶段

  整个程序须要一个Drvier来进行提交,提交的是一个描述了各类必要信息的job对象

4)案例实操

       详见3.1.1统计一堆文件中单词出现的个数(WordCount案例)

1.5 MapReduce程序运行流程分析

 

1)在MapReduce程序读取文件的输入目录上存放相应的文件。

2)客户端程序在submit()方法执行前,获取待处理的数据信息,而后根据集群中参数的配置造成一个任务分配规划。

3)客户端提交job.split、jar包、job.xml等文件给yarn,yarn中的resourcemanager启动MRAppMaster。

4)MRAppMaster启动后根据本次job的描述信息,计算出须要的maptask实例数量,而后向集群申请机器启动相应数量的maptask进程。

5)maptask利用客户指定的inputformat来读取数据,造成输入KV对。

6)maptask将输入KV对传递给客户定义的map()方法,作逻辑运算

7)map()运算完毕后将KV对收集到maptask缓存。

8)maptask缓存中的KV对按照K分区排序后不断写到磁盘文件

9)MRAppMaster监控到全部maptask进程任务完成以后,会根据客户指定的参数启动相应数量的reducetask进程,并告知reducetask进程要处理的数据分区。

10)Reducetask进程启动以后,根据MRAppMaster告知的待处理数据所在位置,从若干台maptask运行所在机器上获取到若干个maptask输出结果文件,并在本地进行从新归并排序,而后按照相同key的KV为一个组,调用客户定义的reduce()方法进行逻辑运算。

11)Reducetask运算完毕后,调用客户指定的outputformat将结果数据输出到外部存储。

二。MapReduce理论篇

2.1 Writable序列化

  序列化就是把内存中的对象,转换成字节序列(或其余数据传输协议)以便于存储(持久化)和网络传输。 

  反序列化就是将收到字节序列(或其余数据传输协议)或者是硬盘的持久化数据,转换成内存中的对象。

  Java的序列化是一个重量级序列化框架(Serializable),一个对象被序列化后,会附带不少额外的信息(各类校验信息,header,继承体系等),不便于在网络中高效传输。因此,hadoop本身开发了一套序列化机制(Writable),精简、高效。

2.1.1 经常使用数据序列化类型

经常使用的数据类型对应的hadoop数据序列化类型

Java类型

Hadoop Writable类型

boolean

BooleanWritable

byte

ByteWritable

int

IntWritable

float

FloatWritable

long

LongWritable

double

DoubleWritable

string

Text

map

MapWritable

array

ArrayWritable

2.1.2 自定义bean对象实现序列化接口

1)自定义bean对象要想序列化传输,必须实现序列化接口,须要注意如下7项。

(1)必须实现Writable接口

(2)反序列化时,须要反射调用空参构造函数,因此必须有空参构造

(3)重写序列化方法

(4)重写反序列化方法

(5)注意反序列化的顺序和序列化的顺序彻底一致

(6)要想把结果显示在文件中,须要重写toString(),且用”\t”分开,方便后续用

(7)若是须要将自定义的bean放在key中传输,则还须要实现comparable接口,由于mapreduce框中的shuffle过程必定会对key进行排序

// 1 必须实现Writable接口
public class FlowBean implements Writable {

    private long upFlow;
    private long downFlow;
    private long sumFlow;

    //2 反序列化时,须要反射调用空参构造函数,因此必须有
    public FlowBean() {
        super();
    }

    /**
     * 3重写序列化方法
     * 
     * @param out
     * @throws IOException
     */
    @Override
    public void write(DataOutput out) throws IOException {
        out.writeLong(upFlow);
        out.writeLong(downFlow);
        out.writeLong(sumFlow);
    }

    /**
     * 4 重写反序列化方法 
5 注意反序列化的顺序和序列化的顺序彻底一致
     * 
     * @param in
     * @throws IOException
     */
    @Override
    public void readFields(DataInput in) throws IOException {
        upFlow = in.readLong();
        downFlow = in.readLong();
        sumFlow = in.readLong();
    }

    // 6要想把结果显示在文件中,须要重写toString(),且用”\t”分开,方便后续用
    @Override
    public String toString() {
        return upFlow + "\t" + downFlow + "\t" + sumFlow;
    }

    //7 若是须要将自定义的bean放在key中传输,则还须要实现comparable接口,由于mapreduce框中的shuffle过程必定会对key进行排序
    @Override
    public int compareTo(FlowBean o) {
        // 倒序排列,从大到小
        return this.sumFlow > o.getSumFlow() ? -1 : 1;
    }
}

2)案例

  详见3.2.1统计每个手机号耗费的总上行流量、下行流量、总流量(序列化)。

2.2 InputFormat数据切片机制

2.2.1 FileInputFormat切片机制

1)job提交流程源码详解

waitForCompletion()
submit();
// 1创建链接
    connect();    
        // 1)建立提交job的代理
        new Cluster(getConfiguration());
            // (1)判断是本地yarn仍是远程
            initialize(jobTrackAddr, conf); 
    // 2 提交job
submitter.submitJobInternal(Job.this, cluster)
    // 1)建立给集群提交数据的Stag路径
    Path jobStagingArea = JobSubmissionFiles.getStagingDir(cluster, conf);
    // 2)获取jobid ,并建立job路径
    JobID jobId = submitClient.getNewJobID();
    // 3)拷贝jar包到集群
copyAndConfigureFiles(job, submitJobDir);    
    rUploader.uploadFiles(job, jobSubmitDir);
// 4)计算切片,生成切片规划文件
writeSplits(job, submitJobDir);
    maps = writeNewSplits(job, jobSubmitDir);
        input.getSplits(job);
// 5)向Stag路径写xml配置文件
writeConf(conf, submitJobFile);
    conf.writeXml(out);
// 6)提交job,返回提交状态
status = submitClient.submitJob(jobId, submitJobDir.toString(), job.getCredentials());

2)FileInputFormat源码解析(input.getSplits(job))

  (1)找到你数据存储的目录。

       (2)开始遍历处理(规划切片)目录下的每个文件

       (3)遍历第一个文件ss.txt

              a)获取文件大小fs.sizeOf(ss.txt);

              b)计算切片大小computeSliteSize(Math.max(minSize,Math.max(maxSize,blocksize)))=blocksize=128M

    c)默认状况下,切片大小=blocksize

              d)开始切,造成第1个切片:ss.txt—0:128M 第2个切片ss.txt—128:256M 第3个切片ss.txt—256M:300M(每次切片时,都要判断切完剩下的部分是否大于块的1.1倍,不大于1.1倍就划分一块切片)

              e)将切片信息写到一个切片规划文件中

              f)整个切片的核心过程在getSplit()方法中完成。

    g)数据切片只是在逻辑上对输入数据进行分片,并不会再磁盘上将其切分红分片进行存储。InputSplit只记录了分片的元数据信息,好比起始位置、长度以及所在的节点列表等。

    h)注意:block是HDFS上物理上存储的存储的数据,切片是对数据逻辑上的划分。

       (4)提交切片规划文件到yarn上,yarn上的MrAppMaster就能够根据切片规划文件计算开启maptask个数。

3)FileInputFormat中默认的切片机制:

  (1)简单地按照文件的内容长度进行切片

  (2)切片大小,默认等于block大小

  (3)切片时不考虑数据集总体,而是逐个针对每个文件单独切片

  好比待处理数据有两个文件:

file1.txt    320M
file2.txt    10M

  通过FileInputFormat的切片机制运算后,造成的切片信息以下:

file1.txt.split1--  0~128
file1.txt.split2--  128~256
file1.txt.split3--  256~320
file2.txt.split1--  0~10M

4)FileInputFormat切片大小的参数配置

  (1)经过分析源码,在FileInputFormat中,计算切片大小的逻辑:Math.max(minSize, Math.min(maxSize, blockSize)); 

  切片主要由这几个值来运算决定

  mapreduce.input.fileinputformat.split.minsize=1 默认值为1

  mapreduce.input.fileinputformat.split.maxsize= Long.MAXValue 默认值Long.MAXValue

  所以,默认状况下,切片大小=blocksize。

  maxsize(切片最大值):参数若是调得比blocksize小,则会让切片变小,并且就等于配置的这个参数的值。

  minsize (切片最小值):参数调的比blockSize大,则可让切片变得比blocksize还大。

5)获取切片信息API

// 根据文件类型获取切片信息
FileSplit inputSplit = (FileSplit) context.getInputSplit();
// 获取切片的文件名称
String name = inputSplit.getPath().getName();

2.2.2 CombineTextInputFormat切片机制

关于大量小文件的优化策略

  1)默认状况下TextInputformat对任务的切片机制是按文件规划切片,无论文件多小,都会是一个单独的切片,都会交给一个maptask,这样若是有大量小文件,就会产生大量的maptask,处理效率极其低下。

  2)优化策略

         (1)最好的办法,在数据处理系统的最前端(预处理/采集),将小文件先合并成大文件,再上传到HDFS作后续分析。

         (2)补救措施:若是已是大量小文件在HDFS中了,可使用另外一种InputFormat来作切片(CombineTextInputFormat),它的切片逻辑跟TextFileInputFormat不一样:它能够将多个小文件从逻辑上规划到一个切片中,这样,多个小文件就能够交给一个maptask。

         (3)优先知足最小切片大小,不超过最大切片大小

                CombineTextInputFormat.setMaxInputSplitSize(job, 4194304);// 4m

                CombineTextInputFormat.setMinInputSplitSize(job, 2097152);// 2m

         举例:0.5m+1m+0.3m+5m=2m + 4.8m=2m + 4m + 0.8m

3)具体实现步骤

// 9 若是不设置InputFormat,它默认用的是TextInputFormat.class
job.setInputFormatClass(CombineTextInputFormat.class)
CombineTextInputFormat.setMaxInputSplitSize(job, 4194304);// 4m
CombineTextInputFormat.setMinInputSplitSize(job, 2097152);// 2m

4)案例

  详见3.1.4 需求4:大量小文件的切片优化(CombineTextInputFormat)。

2.2.3 自定义InputFormat

1)概述

  (1)自定义一个InputFormat

  (2)改写RecordReader,实现一次读取一个完整文件封装为KV

  (3)在输出时使用SequenceFileOutPutFormat输出合并文件

2)案例

       详见3.5小文件处理(自定义InputFormat)。

2.3 MapTask工做机制

1)问题引出

  maptask的并行度决定map阶段的任务处理并发度,进而影响到整个job的处理速度。那么,mapTask并行任务是否越多越好呢?

2)MapTask并行度决定机制

       一个job的map阶段MapTask并行度(个数),由客户端提交job时的切片个数决定。

3)MapTask工做机制

       (1)Read阶段:Map Task经过用户编写的RecordReader,从输入InputSplit中解析出一个个key/value。

       (2)Map阶段:该节点主要是将解析出的key/value交给用户编写map()函数处理,并产生一系列新的key/value。

       (3)Collect阶段:在用户编写map()函数中,当数据处理完成后,通常会调用OutputCollector.collect()输出结果。在该函数内部,它会将生成的key/value分区(调用Partitioner),并写入一个环形内存缓冲区中。

       (4)Spill阶段:即“溢写”,当环形缓冲区满后,MapReduce会将数据写到本地磁盘上,生成一个临时文件。须要注意的是,将数据写入本地磁盘以前,先要对数据进行一次本地排序,并在必要时对数据进行合并、压缩等操做。

       溢写阶段详情:

       步骤1:利用快速排序算法对缓存区内的数据进行排序,排序方式是,先按照分区编号partition进行排序,而后按照key进行排序。这样,通过排序后,数据以分区为单位汇集在一块儿,且同一分区内全部数据按照key有序。

       步骤2:按照分区编号由小到大依次将每一个分区中的数据写入任务工做目录下的临时文件output/spillN.out(N表示当前溢写次数)中。若是用户设置了Combiner,则写入文件以前,对每一个分区中的数据进行一次汇集操做。

       步骤3:将分区数据的元信息写到内存索引数据结构SpillRecord中,其中每一个分区的元信息包括在临时文件中的偏移量、压缩前数据大小和压缩后数据大小。若是当期内存索引大小超过1MB,则将内存索引写到文件output/spillN.out.index中。

       (5)Combine阶段:当全部数据处理完成后,MapTask对全部临时文件进行一次合并,以确保最终只会生成一个数据文件。

       当全部数据处理完后,MapTask会将全部临时文件合并成一个大文件,并保存到文件output/file.out中,同时生成相应的索引文件output/file.out.index。

       在进行文件合并过程当中,MapTask以分区为单位进行合并。对于某个分区,它将采用多轮递归合并的方式。每轮合并io.sort.factor(默认100)个文件,并将产生的文件从新加入待合并列表中,对文件排序后,重复以上过程,直到最终获得一个大文件。

       让每一个MapTask最终只生成一个数据文件,可避免同时打开大量文件和同时读取大量小文件产生的随机读取带来的开销。

2.4 Shuffle机制

2.4.1 Shuffle机制

  Mapreduce确保每一个reducer的输入都是按键排序的。系统执行排序的过程(即将map输出做为输入传给reducer)称为shuffle。

2.4.2 MapReduce工做流程

1)流程示意图

2)流程详解

  上面的流程是整个mapreduce最全工做流程,可是shuffle过程只是从第7步开始到第16步结束,具体shuffle过程详解,以下:

  1)maptask收集咱们的map()方法输出的kv对,放到内存缓冲区中

  2)从内存缓冲区不断溢出本地磁盘文件,可能会溢出多个文件

  3)多个溢出文件会被合并成大的溢出文件

  4)在溢出过程当中,及合并的过程当中,都要调用partitoner进行分组和针对key进行排序

  5)reducetask根据本身的分区号,去各个maptask机器上取相应的结果分区数据

  6)reducetask会取到同一个分区的来自不一样maptask的结果文件,reducetask会将这些文件再进行合并(归并排序)

  7)合并成大文件后,shuffle的过程也就结束了,后面进入reducetask的逻辑运算过程(从文件中取出一个一个的键值对group,调用用户自定义的reduce()方法)

3)注意

  Shuffle中的缓冲区大小会影响到mapreduce程序的执行效率,原则上说,缓冲区越大,磁盘io的次数越少,执行速度就越快。

  缓冲区的大小能够经过参数调整,参数:io.sort.mb  默认100M

2.4.3 partition分区

0)问题引出:要求将统计结果按照条件输出到不一样文件中(分区)。好比:将统计结果按照手机归属地不一样省份输出到不一样文件中(分区)

1)默认partition分区

public class HashPartitioner<K, V> extends Partitioner<K, V> {
  /** Use {@link Object#hashCode()} to partition. */
  public int getPartition(K key, V value, int numReduceTasks) {
    return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;
  }
}

  默认分区是根据key的hashCode对reduceTasks个数取模获得的。用户无法控制哪一个key存储到哪一个分区

2)自定义Partitioner步骤

       (1)自定义类继承Partitioner,从新getPartition()方法

    public class ProvincePartitioner extends Partitioner<Text, FlowBean> {
    @Override
    public int getPartition(Text key, FlowBean value, int numPartitions) {
// 1 获取电话号码的前三位
        String preNum = key.toString().substring(0, 3);
        
        int partition = 4;
        
        // 2 判断是哪一个省
        if ("136".equals(preNum)) {
            partition = 0;
        }else if ("137".equals(preNum)) {
            partition = 1;
        }else if ("138".equals(preNum)) {
            partition = 2;
        }else if ("139".equals(preNum)) {
            partition = 3;
        }
        return partition;
    }
}

  (2)在job驱动中,设置自定义partitioner:

job.setPartitionerClass(CustomPartitioner.class)

  (3)自定义partition后,要根据自定义partitioner的逻辑设置相应数量的reduce task

job.setNumReduceTasks(5);

3)注意:

  若是reduceTask的数量> getPartition的结果数,则会多产生几个空的输出文件part-r-000xx;

  若是1<reduceTask的数量<getPartition的结果数,则有一部分分区数据无处安放,会Exception;

  若是reduceTask的数量=1,则无论mapTask端输出多少个分区文件,最终结果都交给这一个reduceTask,最终也就只会产生一个结果文件 part-r-00000;

         例如:假设自定义分区数为5,则

    (1)job.setNumReduceTasks(1);会正常运行,只不过会产生一个输出文件

    (2)job.setNumReduceTasks(2);会报错

    (3)job.setNumReduceTasks(6);大于5,程序会正常运行,会产生空文件

4)案例

  详见3.2.2 需求2:将统计结果按照手机归属地不一样省份输出到不一样文件中(Partitioner)

  详见3.1.2 需求2:把单词按照ASCII码奇偶分区(Partitioner)

2.4.4 排序

  排序是MapReduce框架中最重要的操做之一。Map Task和Reduce Task均会对数据(按照key)进行排序。该操做属于Hadoop的默认行为。任何应用程序中的数据均会被排序,而无论逻辑上是否须要。

       对于Map Task,它会将处理的结果暂时放到一个缓冲区中,当缓冲区使用率达到必定阈值后,再对缓冲区中的数据进行一次排序,并将这些有序数据写到磁盘上,而当数据处理完毕后,它会对磁盘上全部文件进行一次合并,以将这些文件合并成一个大的有序文件。

       对于Reduce Task,它从每一个Map Task上远程拷贝相应的数据文件,若是文件大小超过必定阈值,则放到磁盘上,不然放到内存中。若是磁盘上文件数目达到必定阈值,则进行一次合并以生成一个更大文件;若是内存中文件大小或者数目超过必定阈值,则进行一次合并后将数据写到磁盘上。当全部数据拷贝完毕后,Reduce Task统一对内存和磁盘上的全部数据进行一次合并。

每一个阶段的默认排序

1)排序的分类:

       (1)部分排序:

MapReduce根据输入记录的键对数据集排序。保证输出的每一个文件内部排序。

       (2)全排序:

如何用Hadoop产生一个全局排序的文件?最简单的方法是使用一个分区。但该方法在处理大型文件时效率极低,由于一台机器必须处理全部输出文件,从而彻底丧失了MapReduce所提供的并行架构。

       替代方案:首先建立一系列排好序的文件;其次,串联这些文件;最后,生成一个全局排序的文件。主要思路是使用一个分区来描述输出的全局排序。例如:能够为上述文件建立3个分区,在第一分区中,记录的单词首字母a-g,第二分区记录单词首字母h-n, 第三分区记录单词首字母o-z。

  (3)辅助排序:(GroupingComparator分组)

       Mapreduce框架在记录到达reducer以前按键对记录排序,但键所对应的值并无被排序。甚至在不一样的执行轮次中,这些值的排序也不固定,由于它们来自不一样的map任务且这些map任务在不一样轮次中完成时间各不相同。通常来讲,大多数MapReduce程序会避免让reduce函数依赖于值的排序。可是,有时也须要经过特定的方法对键进行排序和分组等以实现对值的排序。

2)自定义排序WritableComparable

  (1)原理分析

  bean对象实现WritableComparable接口重写compareTo方法,就能够实现排序

@Override
public int compareTo(FlowBean o) {
    // 倒序排列,从大到小
    return this.sumFlow > o.getSumFlow() ? -1 : 1;
}

  (2)案例

    详见3.2.3 需求3:将统计结果按照总流量倒序排序(排序)

2.4.5 GroupingComparator分组

  1)对reduce阶段的数据根据某一个或几个字段进行分组。

  2)案例

    详见3.3 求出每个订单中最贵的商品(GroupingComparator)

2.4.6 Combiner合并

  1)combiner是MR程序中Mapper和Reducer以外的一种组件

  2)combiner组件的父类就是Reducer

  3)combiner和reducer的区别在于运行的位置:

    Combiner是在每个maptask所在的节点运行

    Reducer是接收全局全部Mapper的输出结果;

  4)combiner的意义就是对每个maptask的输出进行局部汇总,以减少网络传输量

  5)自定义Combiner实现步骤:

  (1)自定义一个combiner继承Reducer,重写reduce方法

public class WordcountCombiner extends Reducer<Text, IntWritable, Text, IntWritable>{
    @Override
    protected void reduce(Text key, Iterable<IntWritable> values,
            Context context) throws IOException, InterruptedException {
        int count = 0;
        for(IntWritable v :values){
            count = v.get();
        }
        context.write(key, new IntWritable(count));
    }
}

  (2)在job中设置:

job.setCombinerClass(WordcountCombiner.class);

  6)combiner可以应用的前提是不能影响最终的业务逻辑,并且,combiner的输出kv应该跟reducer的输入kv类型要对应起来

    Mapper

    3 5 7 ->(3+5+7)/3=5

    2 6 ->(2+6)/2=4

    Reducer

    (3+5+7+2+6)/5=23/5    不等于    (5+4)/2=9/2

  7)案例

    详见3.1.3需求3:对每个maptask的输出局部汇总(Combiner)

2.4.7 数据倾斜&Distributedcache

1)数据倾斜缘由

  若是是多张表的操做都是在reduce阶段完成,reduce端的处理压力太大,map节点的运算负载则很低,资源利用率不高,且在reduce阶段极易产生数据倾斜。

2)案例:

  详见3.4.1 需求1:reduce端表合并(数据倾斜)

3)解决方案

  在map端缓存多张表,提早处理业务逻辑,这样增长map端业务,减小reduce端数据的压力,尽量的减小数据倾斜。

4)具体办法:采用distributedcache

       (1)在mapper的setup阶段,将文件读取到缓存集合中

       (2)在驱动函数中加载缓存。

    job.addCacheFile(new URI("file:/e:/mapjoincache/pd.txt"));// 缓存普通文件到task运行节点

5)案例:

  详见3.4.2需求2:map端表合并(Distributedcache)

2.5 ReduceTask工做机制

1)设置ReduceTask

  reducetask的并行度一样影响整个job的执行并发度和执行效率,但与maptask的并发数由切片数决定不一样,Reducetask数量的决定是能够直接手动设置:

//默认值是1,手动设置为4
job.setNumReduceTasks(4);

2)注意

  (1)若是数据分布不均匀,就有可能在reduce阶段产生数据倾斜

  (2)reducetask数量并非任意设置,还要考虑业务逻辑需求,有些状况下,须要计算全局汇总结果,就只能有1个reducetask。

  (3)具体多少个reducetask,须要根据集群性能而定。

  (4)若是分区数不是1,可是reducetask为1,是否执行分区过程。答案是:不执行分区过程。由于在maptask的源码中,执行分区的前提是先判断reduceNum个数是否大于1。不大于1确定不执行。

4)ReduceTask工做机制

       (1)Copy阶段:ReduceTask从各个MapTask上远程拷贝一片数据,并针对某一片数据,若是其大小超过必定阈值,则写到磁盘上,不然直接放到内存中。

       (2)Merge阶段:在远程拷贝数据的同时,ReduceTask启动了两个后台线程对内存和磁盘上的文件进行合并,以防止内存使用过多或磁盘上文件过多。

       (3)Sort阶段:按照MapReduce语义,用户编写reduce()函数输入数据是按key进行汇集的一组数据。为了将key相同的数据聚在一块儿,Hadoop采用了基于排序的策略。因为各个MapTask已经实现对本身的处理结果进行了局部排序,所以,ReduceTask只需对全部数据进行一次归并排序便可。

       (4)Reduce阶段:reduce()函数将计算结果写到HDFS上。

2.6 自定义OutputFormat

1)概述

  (1)要在一个mapreduce程序中根据数据的不一样输出两类结果到不一样目录,这类灵活的输出需求能够经过自定义outputformat来实现。

  (1)自定义outputformat,

  (2)改写recordwriter,具体改写输出数据的方法write()

2)案例:

   详见3.6 修改日志内容及自定义日志输出路径(自定义OutputFormat)。

2.7 MapReduce数据压缩

2.7.1 概述

  压缩技术可以有效减小底层存储系统(HDFS)读写字节数。压缩提升了网络带宽和磁盘空间的效率。在Hadood下,尤为是数据规模很大和工做负载密集的状况下,使用数据压缩显得很是重要。在这种状况下,I/O操做和网络数据传输要花大量的时间。还有,Shuffle与Merge过程一样也面临着巨大的I/O压力。

       鉴于磁盘I/O和网络带宽是Hadoop的宝贵资源,数据压缩对于节省资源、最小化磁盘I/O和网络传输很是有帮助。不过,尽管压缩与解压操做的CPU开销不高,其性能的提高和资源的节省并不是没有代价。

       若是磁盘I/O和网络带宽影响了MapReduce做业性能,在任意MapReduce阶段启用压缩均可以改善端到端处理时间并减小I/O和网络流量。

  压缩mapreduce的一种优化策略:经过压缩编码对mapper或者reducer的输出进行压缩,以减小磁盘IO提升MR程序运行速度(但相应增长了cpu运算负担)

  注意:压缩特性运用得当能提升性能,但运用不当也可能下降性能

  基本原则:

  (1)运算密集型的job,少用压缩

  (2)IO密集型的job,多用压缩

2.7.2 MR支持的压缩编码

为了支持多种压缩/解压缩算法,Hadoop引入了编码/解码器,以下表所示

压缩性能的比较

2.7.3 采用压缩的位置

  压缩能够在MapReduce做用的任意阶段启用。

1)输入压缩:

       在有大量数据并计划重复处理的状况下,应该考虑对输入进行压缩。然而,你无须显示指定使用的编解码方式。Hadoop自动检查文件扩展名,若是扩展名可以匹配,就会用恰当的编解码方式对文件进行压缩和解压。不然,Hadoop就不会使用任何编解码器。

2)压缩mapper输出:

  当map任务输出的中间数据量很大时,应考虑在此阶段采用压缩技术。这能显著改善内部数据Shuffle过程,而Shuffle过程在Hadoop处理过程当中是资源消耗最多的环节。若是发现数据量大形成网络传输缓慢,应该考虑使用压缩技术。可用于压缩mapper输出的快速编解码器包括LZO、LZ4或者Snappy。

  注:LZO是供Hadoop压缩数据用的通用压缩编解码器。其设计目标是达到与硬盘读取速度至关的压缩速度,所以速度是优先考虑的因素,而不是压缩率。与gzip编解码器相比,它的压缩速度是gzip的5倍,而解压速度是gzip的2倍。同一个文件用LZO压缩后比用gzip压缩后大50%,但比压缩前小25%~50%。这对改善性能很是有利,map阶段完成时间快4倍。

3)压缩reducer输出:

       在此阶段启用压缩技术可以减小要存储的数据量,所以下降所需的磁盘空间。当mapreduce做业造成做业链条时,由于第二个做业的输入也已压缩,因此启用压缩一样有效。

2.7.4 压缩配置参数

  要在Hadoop中启用压缩,能够配置以下参数(mapred-site.xml文件中):

参数

默认值

阶段

建议

io.compression.codecs  

(在core-site.xml中配置)

org.apache.hadoop.io.compress.DefaultCodec, org.apache.hadoop.io.compress.GzipCodec, org.apache.hadoop.io.compress.BZip2Codec,

org.apache.hadoop.io.compress.Lz4Codec

输入压缩

Hadoop使用文件扩展名判断是否支持某种编解码器

mapreduce.map.output.compress

false

mapper输出

这个参数设为true启用压缩

mapreduce.map.output.compress.codec

org.apache.hadoop.io.compress.DefaultCodec

mapper输出

使用LZO、LZ4或snappy编解码器在此阶段压缩数据

mapreduce.output.fileoutputformat.compress

false

reducer输出

这个参数设为true启用压缩

mapreduce.output.fileoutputformat.compress.codec

org.apache.hadoop.io.compress. DefaultCodec

reducer输出

使用标准工具或者编解码器,如gzip和bzip2

mapreduce.output.fileoutputformat.compress.type

RECORD

reducer输出

SequenceFile输出使用的压缩类型:NONE和BLOCK

2.8 计数器应用

  Hadoop为每一个做业维护若干内置计数器,以描述多项指标。例如,某些计数器记录已处理的字节数和记录数,使用户可监控已处理的输入数据量和已产生的输出数据量。

1)API

       (1)采用枚举的方式统计计数

    enum MyCounter{MALFORORMED,NORMAL}

    //对枚举定义的自定义计数器加1

    context.getCounter(MyCounter.MALFORORMED).increment(1);

  (2)采用计数器组、计数器名称的方式统计

    context.getCounter("counterGroup", "countera").increment(1);

              组名和计数器名称随便起,但最好有意义。

       (3)计数结果在程序运行后的控制台上查看。

2)案例

  详见3.6 修改日志内容及自定义日志输出路径(自定义OutputFormat)。

2.9 数据清洗

1)概述

  在运行Mapreduce程序以前,每每要先对数据进行清洗,清理掉不符合用户要求的数据。清理的过程每每只须要运行mapper程序,不须要运行reduce程序。

2)案例

  详见3.7 日志清洗(数据清洗)。

2.10 MapReduce与Yarn

2.10.1 Yarn概述

  Yarn是一个资源调度平台,负责为运算程序提供服务器运算资源,至关于一个分布式的操做系统平台,而mapreduce等运算程序则至关于运行于操做系统之上的应用程序

2.10.2 Yarn的重要概念

  1)Yarn并不清楚用户提交的程序的运行机制

  2)Yarn只提供运算资源的调度(用户程序向Yarn申请资源,Yarn就负责分配资源)

  3)Yarn中的主管角色叫ResourceManager

  4)Yarn中具体提供运算资源的角色叫NodeManager

  5)这样一来,Yarn其实就与运行的用户程序彻底解耦,就意味着Yarn上能够运行各类类型的分布式运算程序(mapreduce只是其中的一种),好比mapreduce、storm程序,spark程序……

  6)因此,spark、storm等运算框架均可以整合在Yarn上运行,只要他们各自的框架中有符合Yarn规范的资源请求机制便可

  7)Yarn就成为一个通用的资源调度平台,今后,企业中之前存在的各类运算集群均可以整合在一个物理集群上,提升资源利用率,方便数据共享

2.10.3 Yarn工做机制

1)Yarn运行机制

2)工做机制详解

       (0)Mr程序提交到客户端所在的节点

       (1)yarnrunner向Resourcemanager申请一个application。

       (2)rm将该应用程序的资源路径返回给yarnrunner

       (3)该程序将运行所需资源提交到HDFS上

       (4)程序资源提交完毕后,申请运行mrAppMaster

       (5)RM将用户的请求初始化成一个task

       (6)其中一个NodeManager领取到task任务。

       (7)该NodeManager建立容器Container,并产生MRAppmaster

       (8)Container从HDFS上拷贝资源到本地

       (9)MRAppmaster向RM 申请运行maptask容器

       (10)RM将运行maptask任务分配给另外两个NodeManager,另两个NodeManager分别领取任务并建立容器。

       (11)MR向两个接收到任务的NodeManager发送程序启动脚本,这两个NodeManager分别启动maptask,maptask对数据分区排序。

       (12)MRAppmaster向RM申请2个容器,运行reduce task。

       (13)reduce task向maptask获取相应分区的数据。

       (14)程序运行完毕后,MR会向RM注销本身。

2.11 做业提交全过程

2.12 MapReduce开发总结

  mapreduce在编程的时候,基本上一个固化的模式,没有太多可灵活改变的地方,除了如下几处:

1)输入数据接口:InputFormat--->FileInputFormat(文件类型数据读取的通用抽象类)  DBInputFormat (数据库数据读取的通用抽象类)

    默认使用的实现类是:TextInputFormat

    job.setInputFormatClass(TextInputFormat.class)

    TextInputFormat的功能逻辑是:一次读一行文本,而后将该行的起始偏移量做为key,行内容做为value返回

2)逻辑处理接口: Mapper 

    彻底须要用户本身去实现其中:map()   setup()   clean()

3)map输出的结果在shuffle阶段会被partition以及sort,此处有两个接口可自定义:

    (1)Partitioner

         有默认实现 HashPartitioner,逻辑是  根据key和numReduces来返回一个分区号; key.hashCode()&Integer.MAXVALUE % numReduces

         一般状况下,用默认的这个HashPartitioner就能够,若是业务上有特别的需求,能够自定义

       (2)Comparable

         当咱们用自定义的对象做为key来输出时,就必需要实现WritableComparable接口,override其中的compareTo()方法

4)reduce端的数据分组比较接口:Groupingcomparator

       reduceTask拿到输入数据(一个partition的全部数据)后,首先须要对数据进行分组,其分组的默认原则是key相同,而后对每一组kv数据调用一次reduce()方法,而且将这一组kv中的第一个kv的key做为参数传给reduce的key,将这一组数据的value的迭代器传给reduce()的values参数

       利用上述这个机制,咱们能够实现一个高效的分组取最大值的逻辑:

       自定义一个bean对象用来封装咱们的数据,而后改写其compareTo方法产生倒序排序的效果

       而后自定义一个Groupingcomparator,将bean对象的分组逻辑改为按照咱们的业务分组id来分组(好比订单号)

       这样,咱们要取的最大值就是reduce()方法中传进来key

5)逻辑处理接口:Reducer

       彻底须要用户本身去实现其中  reduce()   setup()   clean()   

6)输出数据接口:OutputFormat---> 有一系列子类FileOutputformat  DBoutputFormat  .....

       默认实现类是TextOutputFormat,功能逻辑是:将每个KV对向目标文本文件中输出为一行

2.13 MapReduce参数优化

2.13.1 资源相关参数

1)如下参数是在用户本身的mr应用程序中配置就能够生效

2)应该在yarn启动以前就配置在服务器的配置文件中才能生效

配置参数

参数说明

yarn.scheduler.minimum-allocation-mb   1024

给应用程序container分配的最小内存

yarn.scheduler.maximum-allocation-mb   8192

给应用程序container分配的最大内存

yarn.scheduler.minimum-allocation-vcores   1

 

yarn.scheduler.maximum-allocation-vcores  32

 

yarn.nodemanager.resource.memory-mb   8192

 

3)shuffle性能优化的关键参数,应在yarn启动以前就配置好

配置参数

参数说明

mapreduce.task.io.sort.mb   100

shuffle的环形缓冲区大小,默认100m

mapreduce.map.sort.spill.percent   0.8

环形缓冲区溢出的阈值,默认80%

2.13.2 容错相关参数

配置参数

参数说明

mapreduce.map.maxattempts

每一个Map Task最大重试次数,一旦重试参数超过该值,则认为Map Task运行失败,默认值:4。

mapreduce.reduce.maxattempts

每一个Reduce Task最大重试次数,一旦重试参数超过该值,则认为Map Task运行失败,默认值:4。

mapreduce.map.failures.maxpercent

当失败的Map Task失败比例超过该值为,整个做业则失败,默认值为0. 若是你的应用程序容许丢弃部分输入数据,则该该值设为一个大于0的值,好比5,表示若是有低于5%的Map Task失败(若是一个Map Task重试次数超过mapreduce.map.maxattempts,则认为这个Map Task失败,其对应的输入数据将不会产生任何结果),整个做业扔认为成功。

mapreduce.reduce.failures.maxpercent

当失败的Reduce Task失败比例超过该值为,整个做业则失败,默认值为0。

mapreduce.task.timeout

Task超时时间,常常须要设置的一个参数,该参数表达的意思为:若是一个task在必定时间内没有任何进入,即不会读取新的数据,也没有输出数据,则认为该task处于block状态,多是卡住了,也许永远会卡主,为了防止由于用户程序永远block住不退出,则强制设置了一个该超时时间(单位毫秒),默认是300000。若是你的程序对每条输入数据的处理时间过长(好比会访问数据库,经过网络拉取数据等),建议将该参数调大,该参数太小常出现的错误提示是“AttemptID:attempt_14267829456721_123456_m_000224_0 Timed out after 300 secsContainer killed by the ApplicationMaster.”。

三。MapReduce实战篇

3.1 WordCount案例

3.1.1 需求1:统计一堆文件中单词出现的个数(WordCount案例)

1)数据准备:

[root@master001 jarHadoop]# hadoop fs -cat /ghh/input/hello.txt
i go now
for you said you like me once
to the time to life
rather than to life in time to the time to life
rather than to life in time
life had a lot of things is futile
but we still want to experience
good relationships just happen
they take time
patience and two people who truly want to be together
[root@master001 jarHadoop]#

2)分析

  按照mapreduce编程规范,分别编写Mapper,Reducer,Driver。

3)编写程序

  (1)定义一个mapper类

package com.WordCount;

import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

/**
 * KEYIN:默认状况下,是mr框架所读到的一行文本的起始偏移量,Long;
 * 在hadoop中有本身的更精简的序列化接口,因此不直接用Long,而是用LongWritable
 * VALUEIN:默认状况下,是mr框架所读到的一行文本内容,String;此处用Text
 *
 * KEYOUT:是用户自定义逻辑处理完成以后输出数据中的key,在此处是单词,String;此处用Text
 * VALUEOUT,是用户自定义逻辑处理完成以后输出数据中的value,在此处是单词次数,Integer,此处用IntWritable
 * @author Administrator
 */
public class WordcountMapper extends Mapper<LongWritable, Text, Text, IntWritable>{
    /**
     * map阶段的业务逻辑就写在自定义的map()方法中
     * maptask会对每一行输入数据调用一次咱们自定义的map()方法
     */
    @Override
    protected void map(LongWritable key, Text value, Context context)
            throws IOException, InterruptedException {
        // 1 将maptask传给咱们的文本内容先转换成String
        String line = value.toString();

        // 2 根据空格将这一行切分红单词
        String[] words = line.split(" ");

        // 3 将单词输出为<单词,1>
        for(String word:words){
            // 将单词做为key,将次数1做为value,以便于后续的数据分发,能够根据单词分发,以便于相同单词会到相同的reducetask中
            context.write(new Text(word), new IntWritable(1));
        }
    }
}

  (2)定义一个reducer类

package com.WordCount;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

/**
 * KEYIN , VALUEIN 对应mapper输出的KEYOUT, VALUEOUT类型
 *
 * KEYOUT,VALUEOUT 对应自定义reduce逻辑处理结果的输出数据类型 KEYOUT是单词 VALUEOUT是总次数
 * @author Administrator
 */
public class WordcountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {

    /**
     * key,是一组相同单词kv对的key
     */
    @Override
    protected void reduce(Text key, Iterable<IntWritable> values, Context context)
            throws IOException, InterruptedException {

        int count = 0;

        // 1 汇总各个key的个数
        for(IntWritable value:values){
            count +=value.get();
        }

        // 2输出该key的总次数
        context.write(key, new IntWritable(count));
    }
}

  (3)定义一个主类,用来描述job并提交job

package com.WordCount;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

/**
 * 至关于一个yarn集群的客户端,
 * 须要在此封装咱们的mr程序相关运行参数,指定jar包
 * 最后提交给yarn
 * @author Administrator
 */
public class WordcountDriver {
    public static void main(String[] args) throws Exception {
        // 1 获取配置信息,或者job对象实例
        Configuration configuration = new Configuration();
        // 8 配置提交到yarn上运行,windows和Linux变量不一致
//        configuration.set("mapreduce.framework.name", "yarn");
//        configuration.set("yarn.resourcemanager.hostname", "hadoop103");
        Job job = Job.getInstance(configuration);

        // 6 指定本程序的jar包所在的本地路径
//        job.setJar("/home/gh/wc.jar");
        job.setJarByClass(WordcountDriver.class);

        // 2 指定本业务job要使用的mapper/Reducer业务类
        job.setMapperClass(WordcountMapper.class);
        job.setReducerClass(WordcountReducer.class);

        // 3 指定mapper输出数据的kv类型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);

        // 4 指定最终输出的数据的kv类型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);

        // 5 指定job的输入原始文件所在目录
        FileInputFormat.setInputPaths(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        // 7 将job中配置的相关参数,以及job所用的java类所在的jar包, 提交给yarn去运行
//        job.submit();
        boolean result = job.waitForCompletion(true);
        System.exit(result?0:1);
    }
}

  (4)将程序打成jar包,而后拷贝到hadoop集群中。

  (5)启动hadoop集群

  (6)执行wordcount程序

  [root@master001 jarHadoop]# hadoop jar hadoopTest.jar com.WordCount.WordcountDriver /ghh/inPut /ghh/output

3.1.2 需求2:把单词按照ASCII码奇偶分区(Partitioner)

0)分析

1)自定义分区

package com.WordCount;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;

public class WordCountPartitioner extends Partitioner<Text, IntWritable>{

    @Override
    public int getPartition(Text key, IntWritable value, int numPartitions) {

        // 1 获取单词key
        String firWord = key.toString().substring(0, 1);
        char[] charArray = firWord.toCharArray();
        int result = charArray[0];
        // int result  = key.toString().charAt(0);

        // 2 根据奇数偶数分区
        if (result % 2 == 0) {
            return 0;
        }else {
            return 1;
        }
    }
}

2)在驱动中配置加载分区,设置reducetask个数

job.setPartitionerClass(WordCountPartitioner.class);
job.setNumReduceTasks(2);

3.1.3 需求3:对每个maptask的输出局部汇总(Combiner)

0)需求:统计过程当中对每个maptask的输出进行局部汇总,以减少网络传输量,即采用Combiner功能。

方案一

1)增长一个WordcountCombiner类继承Reducer

package com.WordCount;

import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class WordcountCombiner extends Reducer<Text, IntWritable, Text, IntWritable>{

    @Override
    protected void reduce(Text key, Iterable<IntWritable> values,
                          Context context) throws IOException, InterruptedException {

        int count = 0;
        for(IntWritable v :values){
            count = v.get();
        }

        context.write(key, new IntWritable(count));
    }
}

2)在WordcountDriver驱动类中指定combiner

// 9 指定须要使用combiner,以及用哪一个类做为combiner的逻辑
job.setCombinerClass(WordcountCombiner.class);

方案二

1)将WordcountReducer做为combiner在WordcountDriver驱动类中指定

// 9 指定须要使用combiner,以及用哪一个类做为combiner的逻辑
job.setCombinerClass(WordcountReducer.class);

运行程序

3.1.4 需求4:大量小文件的切片优化(CombineTextInputFormat)

 0)需求:将输入的大量小文件合并成一个切片统一处理。

1)输入数据:准备5个小文件

2)实现过程

  (1)不作任何处理,运行需求1中的wordcount程序,观察切片个数为5

  (2)在WordcountDriver中增长以下代码,运行程序,便可看到运行的切片个数为1

// 9 若是不设置InputFormat,它默认用的是TextInputFormat.class
job.setInputFormatClass(CombineTextInputFormat.class);
CombineTextInputFormat.setMaxInputSplitSize(job, 4194304);// 4m
CombineTextInputFormat.setMinInputSplitSize(job, 2097152);// 2m

3.2 流量汇总程序案例

3.2.1 需求1:统计手机号耗费的总上行流量、下行流量、总流量(序列化)

1)需求:

  统计每个手机号耗费的总上行流量、下行流量、总流量

2)数据准备

[root@master001 jarHadoop]# hadoop fs -cat /ghh/input1/phone_data.txt
1363157985066 13726230503 00-FD-07-A4-72-B8:CMCC 120.196.100.82 i02.c.aliimg.com 24 27 2481 24681 200 1363157995052 13826544101 5C-0E-8B-C7-F1-E0:CMCC 120.197.40.4 4 0 264 0 200 1363157991076 13926435656 20-10-7A-28-CC-0A:CMCC 120.196.100.99 2 4 132 1512 200 1363154400022 13926251106 5C-0E-8B-8B-B1-50:CMCC 120.197.40.4 4 0 240 0 200 1363157993044 18211575961 94-71-AC-CD-E6-18:CMCC-EASY 120.196.100.99 iface.qiyi.com 视频网站 15 12 1527 2106 200 1363157995074 84138413 5C-0E-8B-8C-E8-20:7DaysInn 120.197.40.4 122.72.52.12 20 16 4116 1432 200 1363157993055 13560439658 C4-17-FE-BA-DE-D9:CMCC 120.196.100.99 18 15 1116 954 200 1363157995033 15920133257 5C-0E-8B-C7-BA-20:CMCC 120.197.40.4 sug.so.360.cn 信息安全 20 20 3156 2936 200 1363157983019 13719199419 68-A1-B7-03-07-B1:CMCC-EASY 120.196.100.82 4 0 240 0 200 1363157984041 13660577991 5C-0E-8B-92-5C-20:CMCC-EASY 120.197.40.4 s19.cnzz.com 站点统计 24 9 6960 690 200 1363157973098 15013685858 5C-0E-8B-C7-F7-90:CMCC 120.197.40.4 rank.ie.sogou.com 搜索引擎 28 27 3659 3538 200 1363157986029 15989002119 E8-99-C4-4E-93-E0:CMCC-EASY 120.196.100.99 www.umeng.com 站点统计 3 3 1938 180 200 1363157992093 13560439658 C4-17-FE-BA-DE-D9:CMCC 120.196.100.99 15 9 918 4938 200 1363157986041 13480253104 5C-0E-8B-C7-FC-80:CMCC-EASY 120.197.40.4 3 3 180 180 200 1363157984040 13602846565 5C-0E-8B-8B-B6-00:CMCC 120.197.40.4 2052.flash2-http.qq.com 综合门户 15 12 1938 2910 200 1363157995093 13922314466 00-FD-07-A2-EC-BA:CMCC 120.196.100.82 img.qfc.cn 12 12 3008 3720 200 1363157982040 13502468823 5C-0A-5B-6A-0B-D4:CMCC-EASY 120.196.100.99 y0.ifengimg.com 综合门户 57 102 7335 110349 200 1363157986072 18320173382 84-25-DB-4F-10-1A:CMCC-EASY 120.196.100.99 input.shouji.sogou.com 搜索引擎 21 18 9531 2412 200 1363157990043 13925057413 00-1F-64-E1-E6-9A:CMCC 120.196.100.55 t3.baidu.com 搜索引擎 69 63 11058 48243 200 1363157988072 13760778710 00-FD-07-A4-7B-08:CMCC 120.196.100.82 2 2 120 120 200 1363157985066 13560436666 00-FD-07-A4-72-B8:CMCC 120.196.100.82 i02.c.aliimg.com 24 27 2481 24681 200 1363157993055 13560436666 C4-17-FE-BA-DE-D9:CMCC 120.196.100.99 18 15 1116 954 200 [root@master001 jarHadoop]#

输入数据格式:

1363157993055     13560436666    C4-17-FE-BA-DE-D9:CMCC    120.196.100.99        18    15    1116        954        200
                   手机号码                                                                   上行流量     下行流量

输出数据格式

13560436666         1116              954             2070
手机号码             上行流量           下行流量         总流量

3)分析

基本思路:

Map阶段:

1)读取一行数据,切分字段

2)抽取手机号、上行流量、下行流量

3)以手机号为keybean对象为value输出,即context.write(手机号,bean);

Reduce阶段:

1)累加上行流量和下行流量获得总流量。

2)实现自定义的bean来封装流量信息,并将bean做为map输出的key来传输

3MR程序在处理数据的过程当中会对数据排序(map输出的kv对传输到reduce以前,会排序),排序的依据是map输出的key

因此,咱们若是要实现本身须要的排序规则,则能够考虑将排序因素放到key中,key实现接口:WritableComparable而后重写keycompareTo方法。

4)编写mapreduce程序

       1)编写流量统计的bean对象

package com.flowsum;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.Writable;

// bean对象要实例化
public class FlowBean implements Writable {

    private long upFlow;
    private long downFlow;
    private long sumFlow;

    // 反序列化时,须要反射调用空参构造函数,因此必须有
    public FlowBean() {
        super();
    }

    public FlowBean(long upFlow, long downFlow) {
        super();
        this.upFlow = upFlow;
        this.downFlow = downFlow;
        this.sumFlow = upFlow + downFlow;
    }

    public long getSumFlow() {
        return sumFlow;
    }

    public void setSumFlow(long sumFlow) {
        this.sumFlow = sumFlow;
    }

    public long getUpFlow() {
        return upFlow;
    }

    public void setUpFlow(long upFlow) {
        this.upFlow = upFlow;
    }

    public long getDownFlow() {
        return downFlow;
    }

    public void setDownFlow(long downFlow) {
        this.downFlow = downFlow;
    }

    /**
     * 序列化方法
     *
     * @param out
     * @throws IOException
     */
    @Override
    public void write(DataOutput out) throws IOException {
        out.writeLong(upFlow);
        out.writeLong(downFlow);
        out.writeLong(sumFlow);
    }

    /**
     * 反序列化方法
     注意反序列化的顺序和序列化的顺序彻底一致
     *
     * @param in
     * @throws IOException
     */
    @Override
    public void readFields(DataInput in) throws IOException {
        upFlow = in.readLong();
        downFlow = in.readLong();
        sumFlow = in.readLong();
    }

    @Override
    public String toString() {
        return upFlow + "\t" + downFlow + "\t" + sumFlow;
    }
}

  (2)编写mapreduce主程序

package com.flowsum;

import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class FlowCount {

    static class FlowCountMapper extends Mapper<LongWritable, Text, Text, FlowBean> {

        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            // 1 将一行内容转成string
            String ling = value.toString();

            String newstr1=ling.replaceAll(" ",","); //先把全部空格替换成 逗号。
            String newstr2=newstr1.replaceAll("\t",",");  //再把全部的制表符替换成逗号。
            String newstr3=newstr2.replaceAll(",+", ","); //把全部重复的逗号合并成一个逗号。

            // 2 切分字段
            String[] fields = newstr3.split(",");

            // 3 取出手机号码
            String phoneNum = fields[1];

            // 4 取出上行流量和下行流量
            long upFlow = Long.parseLong(fields[fields.length - 3]);
            long downFlow = Long.parseLong(fields[fields.length - 2]);

            // 5 写出数据
            context.write(new Text(phoneNum), new FlowBean(upFlow, downFlow));
        }
    }

    static class FlowCountReducer extends Reducer<Text, FlowBean, Text, FlowBean> {
        @Override
        protected void reduce(Text key, Iterable<FlowBean> values, Context context)
                throws IOException, InterruptedException {
            long sum_upFlow = 0;
            long sum_downFlow = 0;

            // 1 遍历所用bean,将其中的上行流量,下行流量分别累加
            for (FlowBean bean : values) {
                sum_upFlow += bean.getUpFlow();
                sum_downFlow += bean.getDownFlow();
            }

            // 2 封装对象
            FlowBean resultBean = new FlowBean(sum_upFlow, sum_downFlow);
            context.write(key, resultBean);
        }
    }

    public static void main(String[] args) throws Exception {
        // 1 获取配置信息,或者job对象实例
        Configuration configuration = new Configuration();
        Job job = Job.getInstance(configuration);

        // 6 指定本程序的jar包所在的本地路径
        job.setJarByClass(FlowCount.class);

        // 2 指定本业务job要使用的mapper/Reducer业务类
        job.setMapperClass(FlowCountMapper.class);
        job.setReducerClass(FlowCountReducer.class);

        // 3 指定mapper输出数据的kv类型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(FlowBean.class);

        // 4 指定最终输出的数据的kv类型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(FlowBean.class);

        // 5 指定job的输入原始文件所在目录
        FileInputFormat.setInputPaths(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        // 7 将job中配置的相关参数,以及job所用的java类所在的jar包, 提交给yarn去运行
        boolean result = job.waitForCompletion(true);
        System.exit(result ? 0 : 1);
    }
}

  (3)将程序打成jar包,而后拷贝到hadoop集群中。

  (4)启动hadoop集群

  (5)执行flowcount程序

    [root@master001 jarHadoop]# hadoop jar hadoopTest.jar com.flowsum.FlowCount /ghh/input1 /ghh/output2

  (6)查看结果

3.2.2 需求2:将统计结果按照手机归属地不一样省份输出到不一样文件中(Partitioner)

0)需求:将统计结果按照手机归属地不一样省份输出到不一样文件中(分区)

1)数据准备,同上面需求1的数据

2)分析

  (1)Mapreduce中会将map输出的kv对,按照相同key分组,而后分发给不一样的reducetask。默认的分发规则为:根据key的hashcode%reducetask数来分发

  (2)若是要按照咱们本身的需求进行分组,则须要改写数据分发(分组)组件Partitioner

自定义一个CustomPartitioner继承抽象类:Partitioner

  (3)在job驱动中,设置自定义partitioner: job.setPartitionerClass(CustomPartitioner.class)

3)在需求1的基础上,增长一个分区类

package com.flowsum;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;

/**
 * K2 V2 对应的是map输出kv类型
 * @author Administrator
 */
public class ProvincePartitioner extends Partitioner<Text, FlowBean> {
    @Override
    public int getPartition(Text key, FlowBean value, int numPartitions) {
        // 1 获取电话号码的前三位
        String preNum = key.toString().substring(0, 3);

        int partition = 4;

        // 2 判断是哪一个省
        if ("136".equals(preNum)) {
            partition = 0;
        }else if ("137".equals(preNum)) {
            partition = 1;
        }else if ("138".equals(preNum)) {
            partition = 2;
        }else if ("139".equals(preNum)) {
            partition = 3;
        }
        return partition;
    }
}

2)在驱动函数中增长自定义数据分区设置和reduce task设置

      // 8 指定自定义数据分区
        job.setPartitionerClass(ProvincePartitioner.class);
        
        // 9 同时指定相应数量的reduce task
        job.setNumReduceTasks(5); 

3)将程序打成jar包,而后拷贝到hadoop集群中。

4)启动hadoop集群

5)执行flowcountPartitionser程序

  [root@master001 jarHadoop]# hadoop jar hadoopTest.jar com.flowsum.FlowCount /ghh/input1 /ghh/output3

6)查看结果

3.2.3 需求3:将统计结果按照总流量倒序排序(排序)

0)需求

  根据需求1产生的结果再次对总流量进行排序。

1)数据准备,同需求1数据

2)分析

       (1)把程序分两步走,第一步正常统计总流量,第二步再把结果进行排序

       (2)context.write(总流量,手机号)

       (3)FlowBean实现WritableComparable接口重写compareTo方法

@Override
public int compareTo(FlowBean o) {
    // 倒序排列,从大到小
    return this.sumFlow > o.getSumFlow() ? -1 : 1;
}

3)FlowBean对象在在需求1基础上增长了比较功能

package com.flowsum;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.WritableComparable;

public class FlowBean1 implements WritableComparable<FlowBean1> {

    private long upFlow;
    private long downFlow;
    private long sumFlow;

    // 反序列化时,须要反射调用空参构造函数,因此必须有
    public FlowBean1() {
        super();
    }

    public FlowBean1(long upFlow, long downFlow) {
        super();
        this.upFlow = upFlow;
        this.downFlow = downFlow;
        this.sumFlow = upFlow + downFlow;
    }

    public void set(long upFlow, long downFlow) {
        this.upFlow = upFlow;
        this.downFlow = downFlow;
        this.sumFlow = upFlow + downFlow;
    }

    public long getSumFlow() {
        return sumFlow;
    }

    public void setSumFlow(long sumFlow) {
        this.sumFlow = sumFlow;
    }

    public long getUpFlow() {
        return upFlow;
    }

    public void setUpFlow(long upFlow) {
        this.upFlow = upFlow;
    }

    public long getDownFlow() {
        return downFlow;
    }

    public void setDownFlow(long downFlow) {
        this.downFlow = downFlow;
    }

    /**
     * 序列化方法
     * @param out
     * @throws IOException
     */
    @Override
    public void write(DataOutput out) throws IOException {
        out.writeLong(upFlow);
        out.writeLong(downFlow);
        out.writeLong(sumFlow);
    }

    /**
     * 反序列化方法 注意反序列化的顺序和序列化的顺序彻底一致
     * @param in
     * @throws IOException
     */
    @Override
    public void readFields(DataInput in) throws IOException {
        upFlow = in.readLong();
        downFlow = in.readLong();
        sumFlow = in.readLong();
    }

    @Override
    public String toString() {
        return upFlow + "\t" + downFlow + "\t" + sumFlow;
    }

 @Override public int compareTo(FlowBean1 o) {
        // 倒序排列,从大到小
        return this.sumFlow > o.getSumFlow() ? -1 : 1; }
}

4)Map方法优化为一个对象,reduce方法则直接输出结果便可,驱动函数根据输入输出重写配置便可。

package com.flowsum;

import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class FlowCountSort {
    static class FlowCountSortMapper extends Mapper<LongWritable, Text, FlowBean1, Text>{
        FlowBean1 bean = new FlowBean1();
        Text v = new Text();

        @Override
        protected void map(LongWritable key, Text value, Context context)
                throws IOException, InterruptedException {

            // 1 拿到的是上一个统计程序输出的结果,已是各手机号的总流量信息
            String line = value.toString();

            String newstr1=line.replaceAll(" ",","); //先把全部空格替换成 逗号。
            String newstr2=newstr1.replaceAll("\t",",");  //再把全部的制表符替换成逗号。
            String newstr3=newstr2.replaceAll(",+", ","); //把全部重复的逗号合并成一个逗号。

            // 2 截取字符串并获取电话号、上行流量、下行流量
            String[] fields = newstr3.split(",");
            String phoneNbr = fields[0];

            // 4 取出上行流量和下行流量
            long upFlow = Long.parseLong(fields[fields.length - 3]);
            long downFlow = Long.parseLong(fields[fields.length - 2]);

            // 3 封装对象
            bean.set(upFlow, downFlow);
            v.set(phoneNbr);

            // 4 输出
            context.write(bean, v);
        }
    }

    static class FlowCountSortReducer extends Reducer<FlowBean1, Text, Text, FlowBean1>{

        @Override
        protected void reduce(FlowBean1 bean, Iterable<Text> values, Context context)
                throws IOException, InterruptedException {
            context.write(values.iterator().next(), bean);
        }
    }

    public static void main(String[] args) throws Exception {
        // 1 获取配置信息,或者job对象实例
        Configuration configuration = new Configuration();
        Job job = Job.getInstance(configuration);

        // 6 指定本程序的jar包所在的本地路径
        job.setJarByClass(FlowCountSort.class);

        // 2 指定本业务job要使用的mapper/Reducer业务类
        job.setMapperClass(FlowCountSortMapper.class);
        job.setReducerClass(FlowCountSortReducer.class);

        // 3 指定mapper输出数据的kv类型
        job.setMapOutputKeyClass(FlowBean1.class);
        job.setMapOutputValueClass(Text.class);

        // 4 指定最终输出的数据的kv类型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(FlowBean1.class);

        // 5 指定job的输入原始文件所在目录
        FileInputFormat.setInputPaths(job, new Path(args[0]));

        Path outPath = new Path(args[1]);
//        FileSystem fs = FileSystem.get(configuration);
//        if (fs.exists(outPath)) {
//            fs.delete(outPath, true);
//        }
        FileOutputFormat.setOutputPath(job, outPath);

        // 7 将job中配置的相关参数,以及job所用的java类所在的jar包, 提交给yarn去运行
        boolean result = job.waitForCompletion(true);
        System.exit(result ? 0 : 1);
    }
}

5)将程序打成jar包,而后拷贝到hadoop集群中。

6)启动hadoop集群

7)执行flowcountsort程序

  [root@master001 jarHadoop]# hadoop jar hadoopTest.jar com.flowsum.FlowCountSort /ghh/input1 /ghh/output4

8)查看结果

3.3 求每一个订单中最贵的商品(GroupingComparator)

1)需求

  有以下订单数据

订单id

商品id

成交金额

Order_0000001

Pdt_01

222.8

Order_0000001

Pdt_05

25.8

Order_0000002

Pdt_03

522.8

Order_0000002

Pdt_04

122.4

Order_0000002

Pdt_05

722.4

Order_0000003

Pdt_01

222.8

Order_0000003

Pdt_02

33.8

  如今须要求出每个订单中最贵的商品。

2)输入数据

[root@master001 jarHadoop]# hadoop fs -cat /ghh/input2/GroupingComparator.txt0000001 Pdt_01  222.8
0000002 Pdt_05  722.4
0000001 Pdt_05  25.8
0000003 Pdt_01  222.8
0000003 Pdt_01  33.8
0000002 Pdt_03  522.8
0000002 Pdt_04  122.4
[root@master001 jarHadoop]#

3)分析

  (1)利用“订单id和成交金额”做为key,能够将map阶段读取到的全部订单数据按照id分区,按照金额排序,发送到reduce。

  (2)在reduce端利用groupingcomparator将订单id相同的kv聚合成组,而后取第一个便是最大值。

4)实现

定义订单信息OrderBean

package com.order;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.WritableComparable;

public class OrderBean implements WritableComparable<OrderBean> {

    private String orderId;
    private double price;

    public OrderBean() {
        super();
    }

    public OrderBean(String orderId, double price) {
        super();
        this.orderId = orderId;
        this.price = price;
    }

    public String getOrderId() {
        return orderId;
    }

    public void setOrderId(String orderId) {
        this.orderId = orderId;
    }

    public double getPrice() {
        return price;
    }

    public void setPrice(double price) {
        this.price = price;
    }

    @Override
    public void readFields(DataInput in) throws IOException {
        this.orderId = in.readUTF();
        this.price = in.readDouble();
    }

    @Override
    public void write(DataOutput out) throws IOException {
        out.writeUTF(orderId);
        out.writeDouble(price);
    }

    @Override
    public int compareTo(OrderBean o) {
        // 1 先按订单id排序(从小到大)
        int result = this.orderId.compareTo(o.getOrderId());

        if (result == 0) {
            // 2 再按金额排序(从大到小)
            result = price > o.getPrice() ? -1 : 1;
        }

        return result;
    }
    @Override
    public String toString() {
        return orderId + "\t" + price ;
    }
}

编写OrderSortMapper处理流程

package com.order;

import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class OrderSortMapper extends Mapper<LongWritable, Text, OrderBean, NullWritable>{
    OrderBean bean = new OrderBean();

    @Override
    protected void map(LongWritable key, Text value,
                       Context context)throws IOException, InterruptedException {
        // 1 获取一行数据
        String line = value.toString();

        String newstr1=line.replaceAll(" ",","); //先把全部空格替换成 逗号。
        String newstr2=newstr1.replaceAll("\t",",");  //再把全部的制表符替换成逗号。
        String newstr3=newstr2.replaceAll(",+", ","); //把全部重复的逗号合并成一个逗号。

        // 2 截取字段
        String[] fields = newstr3.split(",");

        // 3 封装bean
        bean.setOrderId(fields[0]);
        bean.setPrice(Double.parseDouble(fields[2]));

        // 4 写出
        context.write(bean, NullWritable.get());
    }
}

编写OrderSortReducer处理流程

package com.order;

import java.io.IOException;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Reducer;

public class OrderSortReducer extends Reducer<OrderBean, NullWritable, OrderBean, NullWritable>{
    @Override
    protected void reduce(OrderBean bean, Iterable<NullWritable> values,
                          Context context) throws IOException, InterruptedException {
        // 直接写出
        context.write(bean, NullWritable.get());
    }
}

编写OrderSortDriver处理流程

package com.order;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class OrderSortDriver {

    public static void main(String[] args) throws Exception {
        // 1 获取配置信息
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf);

        // 2 设置jar包加载路径
        job.setJarByClass(OrderSortDriver.class);

        // 3 加载map/reduce类
        job.setMapperClass(OrderSortMapper.class);
        job.setReducerClass(OrderSortReducer.class);

        // 4 设置map输出数据key和value类型
        job.setMapOutputKeyClass(OrderBean.class);
        job.setMapOutputValueClass(NullWritable.class);

        // 5 设置最终输出数据的key和value类型
        job.setOutputKeyClass(OrderBean.class);
        job.setOutputValueClass(NullWritable.class);

        // 6 设置输入数据和输出数据路径
        FileInputFormat.setInputPaths(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        // 10 设置reduce端的分组
        job.setGroupingComparatorClass(OrderSortGroupingComparator.class);

        // 7 设置分区
        job.setPartitionerClass(OrderSortPartitioner.class);

        // 8 设置reduce个数
        job.setNumReduceTasks(3);

        // 9 提交
        boolean result = job.waitForCompletion(true);
        System.exit(result ? 0 : 1);
    }
}

编写OrderSortPartitioner处理流程

package com.order;

import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Partitioner;

public class OrderSortPartitioner extends Partitioner<OrderBean, NullWritable>{

    @Override
    public int getPartition(OrderBean key, NullWritable value, int numReduceTasks) {

        return (key.getOrderId().hashCode() & Integer.MAX_VALUE) % numReduceTasks;
    }
}

编写OrderSortGroupingComparator处理流程

package com.order;

import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;

public class OrderSortGroupingComparator extends WritableComparator {

    protected OrderSortGroupingComparator() {
        super(OrderBean.class, true);
    }

    @Override
    public int compare(WritableComparable a, WritableComparable b) {

        OrderBean abean = (OrderBean) a;
        OrderBean bbean = (OrderBean) b;

        // 将orderId相同的bean都视为一组
        return abean.getOrderId().compareTo(bbean.getOrderId());
    }
}

5)将程序打成jar包,而后拷贝到hadoop集群中。

6)启动hadoop集群

7)执行order程序

  [root@master001 jarHadoop]# hadoop jar hadoopTest.jar com.order.OrderSortDriver /ghh/input2 /ghh/output5

8)查看结果

3.4 MapReduce中多表合并案例

相关文章
相关标签/搜索