MapReduce高级特性

计数器

由于计数器的查看每每比查看集群日志来的方便快捷
因此有些状况下计数器信息比集群日志更加有效java

用户自定义的计数器

关于Hadoop的内置计数器的介绍能够参考Hadoop权威指南第九章MapReduce Features中的Build-in Counts小节
这里限于篇幅再也不说明node

MapReduce容许用户在程序中使用枚举或者字符串的格式类自定义计数器
一个做业能够定义的计数器不限,使用枚举类型时
枚举类型的名称即为组名,枚举类型的字段即为计数器名
计数器是全局的,会跨越全部Mapper和Reducer进行使用,并在做业结束的时候产生一个结果web

例如,现有枚举类型以下:apache

enum Temperature{
    MISSING,
    MALFORMAT
}

在MapReduce程序中能够这样来使用计数器:数组

context.getCounter(Temperature.MISSING).increment(1);
context.getCounter(Temperature.MALFORMAT).increment(1);

动态计数器

因为枚举类型在编译的时候就肯定了全部字段,可是某些状况下咱们可能要根据未知的名称来命名计数器
这个时候就可使用动态计数器来实现:缓存

context.getCounter("计数器组名","计数器名").increment(1);

这里的计数器名的得到方式能够是任意的,例如动态获取的字段值等
可是大部分状况下,枚举类型能够足够使用了,并且枚举类型阅读性较强,易于使用,并且是类型安全的
因此推荐尽量的使用枚举类型安全

在代码中获取计数器的值

除了经过Web UI、CLI和-counter参数得到做业的计数器,用户也能够经过代码在程序中获取计数器的值:app

String jobId = args[0];
Cluster cluster = new Cluster(getConf());
Job job = cluster.getJob(JobId.forName(jobId));
if(job == null){
    System.err.println("No job whih ID %s found",jobId);
    return -1;
}
if(!job.isComplete()){
    System.err.println("Job %s is not complete",jobId);
    return -1;
}
Counters counters = job.getCounters();
//关键代码
long missing = conters.findCounter(Temperature.MISSING).getValue();
long total = counters.findCounter(TaskCounter.MAP_INPUT_RECORDS).getValue();

排序

部分排序

部分排序是指在map阶段,对每一个分区中的数据进行排序的过程dom

Hadoop提交做业自定义排序和分组中能够看到
MapReduce中控制部分排序的方法不仅有一种,控制排序的顺序以下:分布式

1.若是设置了mapreduce.job.output.key.comparator.class属性或者setComparatorClass()方法,则使用设置的类进行部分排序
2.不然,键必须是WritableComparable的子类,并使用针对该键类型的已经注册的comparator
3.不然,使用RawComparator将字节流反序列化为对象,并调用WritableComparable的comparaTo()方法

咱们在自定义数据类型的时候继承自WritableComparable,并重写了comparaTo方法,这里的设置是最后才会使用的
若是定义了RawComparator/WritableComparator的具体实现类,那么将会优先使用这个设置,由于其能够直接对比字节流数组

全排序

MapReduce Shuffle阶段的排序只针对各个单独的分区,也就是以前讨论到的部分排序
对于每一个分区,其数据是有序的,可是从数据的整体来看,是无序的
如何让MapReduce产生全局有序的数据呢?
最简单的办法是只使用一个分区,可是这就丧失了MapReduce并行计算的特性,必须在单台机器上处理全部数据

事实上,除了使用一个分区,还有另一种方式既能够实现全局有序,也能够充分利用到MapReduce的并行计算能力
可是这个方法须要作一些额外的工做

思考一下,在部分排序中,每一个分区内的数据都是有序的,可是从分区的角度看就是无序的了
若是咱们可以确保分区也是有序的呢?,例如分区1保存1-100的数据,分区2保存101-200的数据,一次类推
那么从分区的角度看,各个分区之间是有序的,而分区内部的数据也是天然有序的
从而就作到了数据的全局有序

可是在这个过程当中须要注意一个状况:如何确保每一个分区的数据量分配是均匀的?
由于在实际场景中,1-100中包含的数据可能有1000个,而101-200的数据只有50个,这就形成了数据倾斜的问题

为了解决这个问题,咱们一般须要深刻的了解数据的组成特性
可是在海量数据的状况下,不可能对所有数据进行检查
这时咱们可使用采样的方式来进行

采样的核心思想是只查看一小部分的键,得到键的近似分布由此构建分区

Hadoop中已经内置了若干的采样器,接口以下:

public interface Sampler<K,V>{
    K[] getSample(InputFormat<K,V> inf,Job job) throw IOException,InterruptedException;
}

可是一般不会直接使用这个getSample接口,而是由InputSampler的writePartitionFile方法调用
目的是建立一个SequenceFile来存储定义分区的键

public static <K,V> writePartitionFile(Job job,Sampler<K,V> sampler) throw IOException,ClassNotFoundException,InterruptedException

该SequenceFile会被TotalOrderPartitioner使用来为做业建立分区:

//设置分区类为TotalOrderPartitioner
job.setPartitionerClass(TotalOrderPartitioner.class);
//使用随机采样器,采样率为0.1,最大样本数和最大分区数为10000何10,任意一个条件知足以后即刻中止采样
InputSampler.Sampler<IntWritable,Text> sampler = new InputSampler.RandomSampler<IntWritable,Text>(0.1,10000,10);
//使用该采样器建立定义分区键的SequenceFile
InputSampler.writePartitionFile(job,sampler);
///得到该SequenceFile并加入分布式缓存中共享
String partitionFile = TotalOrderPartitioner.getPartitionFile(conf);
URI uri = new URI(partitionFile);
jov.addCacheFile(uri);

这个采样器将会运行在客户端,因此会从集群上下载数据,须要注意下载的数据量不要太大否则运行时间好久
使用该方法还能够自由的设置reducer的任务数,即分区数,经过mapreduce.job.reducers来设置最后须要产生多少个均匀的分区

RandomSampler是一种比较通用的采样器,除了它,还有另一些例如:

  • SplitSampler:只采样一个分片中的前n条记录,没有从所有分片中普遍采样,因此不适合已经排好序的数据
  • IntervalSampler:以必定的间隔从分片中选择键,所以很适合排过序的数据

二次排序

二次排序即为对数据的值进行排序,其实在Hadoop I/O的序列化小节中
就已经讨论过这个问题了,具体案例能够参考:Hadoop提交做业自定义排序和分组

Join链接

使用MapReduce进行链接操做的方式和技巧取决于数据集的规模和结构
若是一个数据集很大,另一个很小,彻底可使用MapReduce中的DistributedCache
将小数据集分发到各个节点上

若是两个数据集都很大,那么又能够分为Map端的Join和Reduce端的Join

Map端的Join

Map端的Join操做会在数据到达map函数以前执行
为了达到这个目的,Map端的输入数据必须:

1.两个数据集被划分为数量相同的分区
2.两个数据集按照相同的键进行排序

因为Map能够设置以前执行的多个做业的输出为其输入,按照以上条件
此时输入数据应该知足:

1.两个做业有相同的reduce数量
2.键是相同的且不可分割

知足Map端Join操做的要求以后,能够利用org.apache.hadoop.mapreduce.join包中的ComsiteInputFormat类在map函数以前执行join操做

Reduce端的Join

比起Map端,Reduce端的Join对数据的要求没有那么高,利用Shuffle相同键的记录会被输入到同一个reducer(分区)中的特性
Reducer端能够自然进行Join操做,可是因为数据要通过Shuffle过程,因此效率每每比Map端的Join要低

并且在Reduce端的Join中,还能够利用到以前讨论的二次排序
有时候join链接须要一个数据集先于另外一个数据集到达reduce函数,这时候咱们能够听过二次排序对数据的值作一个标号
先要达到的数据标号设置为0,另一个数据集设置为1,而后根据这个标号进行排序就能够实现让想要的数据集先一步到达reduce

边数据分布

所谓的边数据(Side Data)能够理解为MapReduce做业执行过程当中
全部任务都有可能要使用到的只读的的数据,用以辅助处理主数据

使用JobConfiguration

Configuration类的各类setter方法能够方便的设置一些键值对类型的数据
用户能够经过getConfiguration方法得到配置的信息

这种方式足以应对不少只须要设置一些属性的场合
可是其缺点是:

  • 只适合相似属性设置的小数据
  • 对于很复杂的对象,用户须要本身设置序列化和反序列化
  • 每次读取配置的时候全部设置都将读取内存,无论有没有用到

DistributedCache

分布式缓存机制在做业运行以前将用户设置的数据拷贝到各个节点中以供使用
缓存的容量大小默认为10G,能够经过yarn.nodemanager.localizer.cache.target-size-mb来配置(以字节为单位)

具体的使用方式参考:MapReduce中的DistributedCache

做者:@小黑