【大数据分析经常使用算法】1.二次排序

简介

本算法教程系列创建在您已经有了spark以及Hadoop的开发基础,若是没有的话,请观看本博客的hadoop相关教程或者自行学习,代码会在博客文档写到必定程度统一放到github下。java

二次排序是指在reducer阶段对与某个键关联的值进行排序,也叫做值转换。git

MapReduce框架会自动对映射器生成的键进行排序。这说明,在reducer阶段准备执行以前,他得到的数据必然是按键有序的(而不是值)。传入各个reducer的值并非有序的,他们的顺序咱们没法肯定(取决于资源调配过程的处理顺序)。咱们的实际需求中,对值的排序状况是很常见的。为了解决这种问题,咱们须要用到二次排序的设计模式。github

二次排序问题是指在reducer阶段对与某个键关联的值进行排序,也叫做值转换。为了理解方便,咱们定义通常的MapReduce处理过程的公式以下: $$ map(key_{1},value_{1}) => list(key_{2}, value_{2}) $$ $$ reduce(key_{2},list(value_{2})) => list(key_{3},value_{3}) $$算法

咱们对这两个公式进行说明:首先,map函数接受一个k1-v1,而后他会输出任何数量的k2-v2对。接下来,reduce函数接收另外一个k-list(v)做为输入,而后将其进行处理,输出另外一个k-v。apache

显然reduce函数的输入list(value_{2})中的{v1,v2,vn....}是一个无序的,二次排序的目的就是让他们变得有序!所以,咱们能够根据上面的公式的模式来定义二次排序的公式,以下所示: $$ map(key_{1},value_{1}) => list(key_{2}, value_{2}) $$ $$ sort(V_{1},V_{2}....V_{n}) => ({S_{1},S{2}}...S_{n}) $$ V表明无序变量,S表明有序变量。设计模式

和以前同样,学以至用。既然理解了二次排序的概念,咱们就来经过一些可以触类旁通的用例来掌握二次排序的设计模式。app

一、案例

1.一、需求

假定咱们有以下的温控数据。框架

2018,01,01,10
2018,01,02,5
2018,01,03,3
2018,01,04,12
2016,11,05,20
2016,11,15,30
2016,03,25,11
2016,04,22,19
2015,06,11,22
2015,06,10,33
2015,07,08,21
2015,02,06,5
2017,11,05,5
2017,11,04,0
2017,02,02,3
2017,02,03,9
2014,06,11,22
2014,06,10,33
2014,07,08,21
2014,07,06,5

该数据是按逗号分隔的每一行,分别表明年、月、日以及当天的温度。用公式表现为 $$ L_{0}=Year,L_{1}=Month,L_{2}=Day,L_{3}=Temperature,S=',' $$ 其中L表明列的意思,下标表明列因此(从0计数),S表明分隔符号,这里的分隔符是英文逗号,整个文件以UTF-8编码(若数据不含中文能够没必要在乎)。dom

如今咱们要求处理这段数据,按年月排序倒叙输出的同时(忽略天气),统计各个月以内温度的变化趋势,一样按温度的降序排列,输出在同一行,下面是输出范例:ide

2018-01	12,10,5,3
2017-11	5,0
2017-02	9,3
2016-11	30,20
...
...

需求公式以下: $$ V(List(dateAndTemperature)) => S(List(date, S(Listtemparature))) $$ 其中date删除了日期。

二、项目解决

2.一、Mapper的输出key

第一步,肯定Map阶段生成的key,显然,咱们得将其定义为包含年月以及温度的WritableBean。

package com.zhaoyi.book.algro.bean;

import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.Date;

public class DateTemperature implements Writable {
    // year month
    private String yearMonth;
    // day
    private String day;
    // temperature
    private Integer temperature;

    // reflect must be need.
    public DateTemperature() {
    }

    public DateTemperature(String yearMonth, String day, Integer temperature) {
        this.yearMonth = yearMonth;
        this.day = day;
        this.temperature = temperature;
    }

    public String getYearMonth() {
        return yearMonth;
    }

    public void setYearMonth(String yearMonth) {
        this.yearMonth = yearMonth;
    }

    public String getDay() {
        return day;
    }

    public void setDay(String day) {
        this.day = day;
    }

    public Integer getTemperature() {
        return temperature;
    }

    public void setTemperature(Integer temperature) {
        this.temperature = temperature;
    }

    // 序列化接口
    public void write(DataOutput out) throws IOException {
        out.writeUTF(yearMonth);
        out.writeUTF(day);
        out.writeInt(temperature);
    }

    public void readFields(DataInput in) throws IOException {
        yearMonth = in.readUTF();
        day = in.readUTF();
        temperature = in.readInt();
    }

    @Override
    public String toString() {
        return yearMonth + ","+ temperature;
    }
}

因为咱们要将DateTemperature定义为Mapper的输出key,所以,咱们还须要定制其排序逻辑,让mapper阶段的shuffle为咱们自动排序。

按照需求,应该优先按年月进行排序,再而后考虑温度,即所谓的thenby方式。注意与二次排序的设计模式区分开。

public class DateTemperature implements Writable, WritableComparable<DateTemperature> {
    ...
        // 排序
    public int compareTo(DateTemperature o) {
        int result = this.getYearMonth().compareTo(o.getYearMonth());
        if(result == 0){
            result = this.getTemperature().compareTo(o.getTemperature());
        }
        // 降序排序。若要升序排序,直接返回result.
        return  -1 * result;
    }

}

2.二、Mapper

肯定了Bean以后,咱们就能够来定制Mapper的逻辑了。代码以下:

package com.zhaoyi.book.algro.bean;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class TemperatureMapper extends Mapper<LongWritable, Text, DateTemperature, IntWritable> {
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        String line = value.toString();
        String[] tokens = line.split(",");
        String yearMonth = tokens[0] + "-" + tokens[1];
        // tokens[2] is Day, it isn't necessary.
        String day = tokens[2];
        Integer temperature = Integer.valueOf(tokens[3]);
        // k->bean v-temperature
        context.write(new DateTemperature(yearMonth,tokens[2], temperature),new IntWritable(temperature));

    }
}

能够看到,Mapper输出的是(Bean-温度)的键值对。若是咱们不对输出的key作任何处理,显然reduce任务会根据key对象的hash值肯定处理次数(分组)。在这种状况下,咱们须要将年月相同的记录聚集到一个reducer进行处理。

要实现该功能,就须要使用自定义的分组比较器。

2.三、GroupComparator

package com.zhaoyi.book.algro.bean;

import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;

public class TemperatureGroupComparator extends WritableComparator {

    public TemperatureGroupComparator(){
        super(DateTemperature.class,true);
    }

    @Override
    public int compare(WritableComparable a, WritableComparable b) {
        return ((DateTemperature) a).getYearMonth().compareTo(((DateTemperature)b).getYearMonth());
    }
}

该分组比较器取代mapreduce的默认分组比较行为(按key的hash),修改成按key中的年月字典序进行比较,这样,就能够将他们聚集在一块儿,统一由一个reducer处理了。

2.四、Reducer

package com.zhaoyi.book.algro.bean;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class TemperatureReducer extends Reducer<DateTemperature, IntWritable,Text, Text> {
    @Override
    protected void reduce(DateTemperature key, Iterable<IntWritable> temperatures, Context context) throws IOException, InterruptedException {
        StringBuilder values = new StringBuilder();
        for (IntWritable temperature : temperatures) {
            values.append(temperature);
            values.append(",");
        }
        // delete the last symbol
        values.deleteCharAt(values.length() - 1);
        // output like 2018-01 22,23...
        context.write(new Text(key.getYearMonth()), new Text(values.toString()));
    }
}

reducer的逻辑比较简单,将发送到本身的数据进行汇总,按需求的格式进行输出。注意此处的temperatures,已是排好序的了。

最后,咱们编写Driver类,编写常规的mapreduce模板代码。

2.五、Driver驱动类

package com.zhaoyi.book.algro.bean;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;
import java.util.UUID;

public class TemperatureDriver {
    public static void main(String[] args) throws Exception {
        Job job = Job.getInstance(new Configuration());

        job.setJarByClass(TemperatureDriver.class);
        job.setMapperClass(TemperatureMapper.class);
        job.setReducerClass(TemperatureReducer.class);

        job.setMapOutputKeyClass(DateTemperature.class);
        job.setMapOutputValueClass(IntWritable.class);

        //job.setPartitionerClass(TemperaturePartition.class);
        //job.setNumReduceTasks(2);

        // add your group comparator class.
        job.setGroupingComparatorClass(TemperatureGroupComparator.class);

        job.setOutputValueClass(Text.class);
        job.setOutputKeyClass(Text.class);

        FileInputFormat.setInputPaths(job,new Path("temperature.txt"));
        FileOutputFormat.setOutputPath(job,new Path("output_" + UUID.randomUUID()) );
        System.exit(job.waitForCompletion(true)? 1:0);

    }
}

最后编写千篇一概的驱动类,别忘了使用job.setGroupingComparatorClass(TemperatureGroupComparator.class);往任务中注册咱们的自定义分组比较器,最后执行代码,能够得到一个输出文件,输出结果以下所示:

2018-01	12,10,5,3
2017-11	5,0
2017-02	9,3
2016-11	30,20
2016-04	19
2016-03	11
2015-07	21
2015-06	33,22
2015-02	5
2014-07	21,5
2014-06	33,22

三、项目分析

第二节咱们快速的搭建了一个解决方案,有一些细节须要特别的分析一下。在这里,咱们须要对Map输出的中间键——DateTemperature进行分析。

为了实现二次排序,咱们就须要控制DateTemperature的排序、以及reducer处理键的顺序。咱们将须要做用的日期以及温度组合了起来,造成了一个组合键,他们之间的关系以下图所示:

天然值也能够理解为键值对去除天然键以后的剩余部分。

二次排序的关键问题在于,如何找出天然键、组合键,以及肯定天然值。

四、优化

本节没有使用分区功能,全部的数据都发往同一个reducer任务上,好比下面的日志:

2019-01-14 11:54:08,395 INFO [org.apache.hadoop.mapred.LocalJobRunner] - reduce > reduce
2019-01-14 11:54:08,395 INFO [org.apache.hadoop.mapred.Task] - Task 'attempt_local1691658871_0001_r_000000_0' done.
2019-01-14 11:54:08,395 INFO [org.apache.hadoop.mapred.LocalJobRunner] - Finishing task: attempt_local1691658871_0001_r_000000_0
2019-01-14 11:54:08,395 INFO [org.apache.hadoop.mapred.LocalJobRunner] - reduce task executor complete.

都由任务编号为attempt_local1691658871_0001_r_000000_0的reducer task处理了。

接下来咱们指定多个分区,在每一个分区中,后台线程按键进行排序,最后,最终发往不一样的reducer进行处理,这很大程度的减轻了单台机器(若是是集群)的负担。

4.一、定义分区器

package com.zhaoyi.book.algro.bean;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;

public class TemperaturePartition extends Partitioner<DateTemperature, IntWritable> {
    @Override
    public int getPartition(DateTemperature dateTemperature, IntWritable temperature, int numPartitions) {
        return Math.abs(dateTemperature.getYearMonth().hashCode() % numPartitions);
    }
}

咱们按天然键对reducer任务数取余的结果(绝对值保证不为负)做为分区的编号。

4.二、驱动器注册分区器并设置分区数

job.setNumReduceTasks(2);

这样,就能够进行一步优化了,最后汇总的文件会有两份,分别对应不一样的分区结果,请留意。

六、总结

二次排序是一个控制reducer对值进行排序的设计模式。当咱们须要排序的数据不止一列以上,或者须要对值进行排序,那么能够考虑这种模式。设计模式以下:

  • 肯定组合键,将组合键做为屡次排序的标准
  • 肯定天然键,经过分组比较器控制天然键相同的数据聚集到同一个reducer进行处理;
  • 天然值的处理放在reducer中按业务需求肯定,整体来讲,和二次排序的关键逻辑并没有多大关系。
相关文章
相关标签/搜索