1.如何解决MapReduce二次排序?
2.Map端如何处理?
3.Reduce端如何处理?
4.MapReduce二次排序是如何具体实现的呢?
1、概述
MapReduce框架对处理结果的输出会根据key值进行默认的排序,这个默认排序能够知足一部分需求,可是也是十分有限的。在咱们实际的需求当中,每每有要对reduce输出结果进行二次排序的需求。对于二次排序的实现,网络上已经有不少人分享过了,可是对二次排序的实现的原理以及整个MapReduce框架的处理流程的分析仍是有很是大的出入,并且部分分析是没有通过验证的。本文将经过一个实际的MapReduce二次排序例子,讲述二次排序的实现和其MapReduce的整个处理流程,而且经过结果和map、reduce端的日志来验证所描述的处理流程的正确性。
2、需求描述
一、输入数据:
sort1 1
sort2 3
sort2 77
sort2 54
sort1 2
sort6 22
sort6 221
sort6 20
二、目标输出
sort1 1,2
sort2 3,54,77
sort6 20,22,221
3、解决思路
一、首先,在思考解决问题思路时,咱们先应该深入的理解MapReduce处理数据的整个流程,这是最基础的,否则的话是不可能找到解决问题的思路的。我描述一下MapReduce处理数据的大概简单流程:首先,MapReduce框架经过getSplit方法实现对原始文件的切片以后,每个切片对应着一个map task,inputSplit输入到Map函数进行处理,中间结果通过环形缓冲区的排序,而后分区、自定义二次排序(若是有的话)和合并,再经过shuffle操做将数据传输到reduce task端,reduce端也存在着缓冲区,数据也会在缓冲区和磁盘中进行合并排序等操做,而后对数据按照Key值进行分组,而后没处理完一个分组以后就会去调用一次reduce函数,最终输出结果。大概流程我画了一下,以下图:java
二、具体解决思路
(1)Map端处理:
根据上面的需求,咱们有一个很是明确的目标就是要对第一列相同的记录合并,而且对合并后的数字进行排序。咱们都知道MapReduce框架不论是默认排序或者是自定义排序都只是对Key值进行排序,如今的状况是这些数据不是key值,怎么办?其实咱们能够将原始数据的Key值和其对应的数据组合成一个新的Key值,而后新的Key值对应的仍是以前的数字。那么咱们就能够将原始数据的map输出变成相似下面的数据结构:
{[sort1,1],1}
{[sort2,3],3}
{[sort2,77],77}
{[sort2,54],54}
{[sort1,2],2}
{[sort6,22],22}
{[sort6,221],221}
{[sort6,20],20}
那么咱们只须要对[]里面的新key值进行排序就ok了。而后咱们须要自定义一个分区处理器,由于个人目标不是想将新key相同的传到同一个reduce中,而是想将新key中的第一个字段相同的才放到同一个reduce中进行分组合并,因此咱们须要根据新key值中的第一个字段来自定义一个分区处理器。经过分区操做后,获得的数据流以下:
Partition1:{[sort1,1],1}、{[sort1,2],2}
Partition2:{[sort2,3],3}、{[sort2,77],77}、{[sort2,54],54}
Partition3:{[sort6,22],22}、{[sort6,221],221}、{[sort6,20],20}
分区操做完成以后,我调用本身的自定义排序器对新的Key值进行排序。
{[sort1,1],1}
{[sort1,2],2}
{[sort2,3],3}
{[sort2,54],54}
{[sort2,77],77}
{[sort6,20],20}
{[sort6,22],22}
{[sort6,221],221}
(2)Reduce端处理:
通过Shuffle处理以后,数据传输到Reducer端了。在Reducer端对按照组合键的第一个字段来进行分组,而且没处理完一次分组以后就会调用一次reduce函数来对这个分组进行处理输出。最终的各个分组的数据结构变成相似下面的数据结构:
{sort1,[1,2]}
{sort2,[3,54,77]}
{sort6,[20,22,221]}
4、具体实现
一、自定义组合键
package com.mr;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.Hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* 自定义组合键
* @author zenghzhaozheng
*/
public class CombinationKey implements WritableComparable<CombinationKey>{
private static final Logger logger = LoggerFactory.getLogger(CombinationKey.class);
private Text firstKey;
private IntWritable secondKey;
public CombinationKey() {
this.firstKey = new Text();
this.secondKey = new IntWritable();
}
public Text getFirstKey() {
return this.firstKey;
}
public void setFirstKey(Text firstKey) {
this.firstKey = firstKey;
}
public IntWritable getSecondKey() {
return this.secondKey;
}
public void setSecondKey(IntWritable secondKey) {
this.secondKey = secondKey;
}
@Override
public void readFields(DataInput dateInput) throws IOException {
// TODO Auto-generated method stub
this.firstKey.readFields(dateInput);
this.secondKey.readFields(dateInput);
}
@Override
public void write(DataOutput outPut) throws IOException {
this.firstKey.write(outPut);
this.secondKey.write(outPut);
}
/**
* 自定义比较策略
* 注意:该比较策略用于mapreduce的第一次默认排序,也就是发生在map阶段的sort小阶段,
* 发生地点为环形缓冲区(能够经过io.sort.mb进行大小调整)
*/
@Override
public int compareTo(CombinationKey combinationKey) {
logger.info("-------CombinationKey flag-------");
return this.firstKey.compareTo(combinationKey.getFirstKey());
}
}
说明:在自定义组合键的时候,咱们须要特别注意,必定要实现WritableComparable接口,而且实现compareTo方法的比较策略。这个用于mapreduce的第一次默认排序,也就是发生在map阶段的sort小阶段,发生地点为环形缓冲区(能够经过io.sort.mb进行大小调整),可是其对咱们最终的二次排序结果是没有影响的。咱们二次排序的最终结果是由咱们的自定义比较器决定的。
二、自定义分区器
package com.mr;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Partitioner;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* 自定义分区
* @author zengzhaozheng
*/
public class DefinedPartition extends Partitioner<CombinationKey,IntWritable>{
private static final Logger logger = LoggerFactory.getLogger(DefinedPartition.class);
/**
* 数据输入来源:map输出
* @author zengzhaozheng
* @param key map输出键值
* @param value map输出value值
* @param numPartitions 分区总数,即reduce task个数
*/
@Override
public int getPartition(CombinationKey key, IntWritable value,int numPartitions) {
logger.info("--------enter DefinedPartition flag--------");
/**
* 注意:这里采用默认的hash分区实现方法
* 根据组合键的第一个值做为分区
* 这里须要说明一下,若是不自定义分区的话,mapreduce框架会根据默认的hash分区方法,
* 将整个组合将相等的分到一个分区中,这样的话显然不是咱们要的效果
*/
logger.info("--------out DefinedPartition flag--------");
return (key.getFirstKey().hashCode()&Integer.MAX_VALUE)%numPartitions;
}
}
说明:具体说明看代码注释。
三、自定义比较器
package com.mr;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* 自定义二次排序策略
* @author zengzhaoheng
*/
public class DefinedComparator extends WritableComparator {
private static final Logger logger = LoggerFactory.getLogger(DefinedComparator.class);
public DefinedComparator() {
super(CombinationKey.class,true);
}
@Override
public int compare(WritableComparable combinationKeyOne,
WritableComparable CombinationKeyOther) {
logger.info("---------enter DefinedComparator flag---------");
CombinationKey c1 = (CombinationKey) combinationKeyOne;
CombinationKey c2 = (CombinationKey) CombinationKeyOther;
/**
* 确保进行排序的数据在同一个区内,若是不在同一个区则按照组合键中第一个键排序
* 另外,这个判断是能够调整最终输出的组合键第一个值的排序
* 下面这种比较对第一个字段的排序是升序的,若是想降序这将c1和c2���过来(假设1)
*/
if(!c1.getFirstKey().equals(c2.getFirstKey())){
logger.info("---------out DefinedComparator flag---------");
return c1.getFirstKey().compareTo(c2.getFirstKey());
}
else{//按照组合键的第二个键的升序排序,将c1和c2倒过来则是按照数字的降序排序(假设2)
logger.info("---------out DefinedComparator flag---------");
return c1.getSecondKey().get()-c2.getSecondKey().get();//0,负数,正数
}
/**
* (1)按照上面的这种实现最终的二次排序结果为:
* sort1 1,2
* sort2 3,54,77
* sort6 20,22,221
* (2)若是实现假设1,则最终的二次排序结果为:
* sort6 20,22,221
* sort2 3,54,77
* sort1 1,2
* (3)若是实现假设2,则最终的二次排序结果为:
* sort1 2,1
* sort2 77,54,3
* sort6 221,22,20
*/
}
}
说明:自定义比较器决定了咱们二次排序的结果。自定义比较器须要继承WritableComparator类,而且重写compare方法实现本身的比较策略。具体的排序问题请看注释。linux