【hadoop】22.MapReduce-shuffle之分区

简介

分区(partition),属于Mapper阶段的流程。前面提到,线程首先根据数据最终要传的reducer把数据分红相应的分区(partition)。java

默认状况下,采起的分区类是HashPartitionergit

public class HashPartitioner<K, V> extends Partitioner<K, V> {
  /** Use {@link Object#hashCode()} to partition. */
  public int getPartition(K key, V value, int numReduceTasks) {
    return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;
  }
}

显然,默认分区是根据key的hashCode对reduceTasks个数取模获得的。用户无法控制哪一个key存储到哪一个分区,所以咱们经常须要自定义分区类。github

编写自定义分区并应用的流程大体分为3步。apache

  1. 自定义类继承Partitioner,重写getPartition()方法;
  2. 在job驱动类中,设置自定义partitioner;
  3. 自定义partition后,要根据自定义partitioner的逻辑设置相应数量的reduce task。

为何须要设置reduce task的数量?app

  • 若是reduceTask的数量 > getPartition的结果数,则会多产生几个空的输出文件part-r-000xx;
  • 若是1 < reduceTask的数量 < getPartition的结果数,则有一部分分区数据无处安放,会Exception;
  • 若是reduceTask的数量 = 1,则无论mapTask端输出多少个分区文件,最终结果都交给这一个reduceTask,最终也就只会产生一个结果文件 part-r-00000;

例如:假设自定义分区数为5,则ide

  • job.setNumReduceTasks(1);会正常运行,只不过会产生一个输出文件
  • job.setNumReduceTasks(2);会报错
  • job.setNumReduceTasks(6);大于5,程序会正常运行,会产生空文件;

一、分区案例——归属地分区电话号码流量统计

接下来咱们完善以前的电话号码手机流量统计项目。oop

完善需求:将统计结果按照手机归属地不一样省份输出到不一样文件。this

根据手机号的前三位能够区分不一样归宿地的电话号码。线程

一、数据准备:依然是以前的输入文件phone_data.txt;code

二、分析

Mapreduce中会将map输出的kv对,按照相同key分组,而后分发给不一样的reducetask。默认的分发规则为:根据key的hashcode%reducetask数来分发。

若是要按照咱们本身的需求进行分组,则须要改写数据分发(分组)组件Partitioner 自定义一个CustomPartitioner继承抽象类:Partitioner

最后再从job驱动中,设置自定义partitioner(以前咱们使用默认的): job.setPartitionerClass(CustomPartitioner.class)

三、编写代码

(1)、在以前的项目中添加分区类

package com.zhaoyi.phoneflow;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;

public class FlowPartition extends Partitioner<Text, FlowBean> {
    public static final String PARTITION_136 = "136";
    public static final String PARTITION_137 = "137";
    public static final String PARTITION_138 = "138";
    public static final String PARTITION_139 = "139";
    @Override
    public int getPartition(Text text, FlowBean flowBean, int i) {
        // default partition is 0.
        int partition = 0;

        // get the phone left 3 number.
        String phonePrefix = text.toString().substring(0,3);

        // get partition.
        if(PARTITION_136.equals(phonePrefix)){
            partition = 1;
        }else if(PARTITION_137.equals(phonePrefix)){
            partition = 2;
        }else if(PARTITION_138.equals(phonePrefix)) {
            partition = 3;
        }else if(PARTITION_139.equals(phonePrefix)) {
            partition = 4;
        }
        return partition;
    }
}

(2)、在驱动类中指定自定义分区类

package com.zhaoyi.phoneflow;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class FlowDriver {
    public static void main(String[] args) throws Exception {
        if(args.length != 2){
            System.out.println("Please enter the parameter: data input and output paths...");
            System.exit(-1);
        }
        Job job = Job.getInstance(new Configuration());
        job.setJarByClass(FlowDriver.class);

        job.setMapperClass(FlowMapper.class);
        job.setReducerClass(FlowReducer.class);

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(FlowBean.class);

        // 设置分区类
        job.setPartitionerClass(FlowPartition.class);
        // 咱们设置了5个分区,对应上。
        job.setNumReduceTasks(5);

        FileInputFormat.setInputPaths(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job,new Path(args[1]));

        System.exit(job.waitForCompletion(true)? 1:0);
    }
}

注意分区返回数字是从0开始计数。

以前咱们未指定分区类的时候,生成一个分区结果文件。这一次就会生成5个分区文件了,分别对应不一样地区的电话号码记录。

本案例的具体代码参见github项目phoneflow模块。

二、分区案例 —— wordcount大小写分区

既然改造了电话号码案例,咱们也来改造一下wordcount案例,将单词统计根据首字母大小写不一样输出为2个文件。即大写字母开头的输出为一个文件,小写字母开头的输出为1个文件。

一、分析

只需在原有项目的基础上,添加分区类:获取每一个单词的首字母,直接使用JAVA Character类API判断是否是小写字母,若不是,则统一断定为大写字母。

二、编写代码,添加分区类

package com.zhaoyi.wordcount;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;

public class WordCountPartition extends Partitioner<Text, IntWritable> {
    @Override
    public int getPartition(Text text, IntWritable intWritable, int i) {
        char firstLetter = text.toString().charAt(0);
        if(Character.isLowerCase(firstLetter)){
            return 1;
        }
        return 0;
    }
}

别忘了在驱动类中设置自定义分区类。

三、查看输出结果

分区文件有2个,分别表明大小写字母的统计结果

#### part-r-00000
Alice	2
So	1
`and	1
`without	1

#### part-r-00001
a	3
and	1
bank	1
......
......
the	3
this	1
......
......
was	3
what	1

注意省略号表明我删除掉的一些记录,只是为了限制篇幅,不属于文件内容。

本案例见github项目的word-count模块。

相关文章
相关标签/搜索