MapReduce Design Patterns(chapter 2(part 1))(二)

随着天天都有更多的数据加载进系统,数据量变得很庞大。这一章专一于对你的数据顶层的,归纳性意见的设计模式,从而使你能扩展思路,但可能对局部数据是不适用的。归纳性的分析都是关于对类似数据的分组和执行统计运算,建立索引,或仅仅为了计数。java

 

经过分组数据集计算聚合排序是一种快速获取结果的好方法。例如,你可能想按某种规则计算出所存的钱的总数,或者按人口计算人们在互联网花费的平均时长。对于新的数据集,你能够开始用这些分析类型帮你计算出数据中什么东西有趣或惟一,和哪些须要仔细研究。sql

 

本章的模式有数值聚合,反向索引,和用计数器计数。书中简洁的MapReduce程序比其它的模式要多。这是由于根据key分组数据是MapReduce规范的核心机制:全部key经过分组聚在一块儿并在reduces端收集。当你把想要分组的字段做为key发送时,分组是MapReduce框架自动处理的。apache

 

Numerical Summarizations

Pattern Description

数值聚合模式是一个用于计算细节上的数据的统计值的通用模式,这种模式很容易形成误解。这种模式下,使用combiner和理解要执行的计算很是重要。设计模式

 

Intent

根据key分组记录并每组计算聚合值,能够对大数据集有更高层次的认识。假设θ是咱们想要执行的聚合方法,要计算的值是列表values(v1, v2, v3,…, vn)),要想求出聚合值λ,令:λ=θ(v1, v2, v3, …, vn).θ的种类有最大值,最小值,平均值,中值,标准差。网络

 

Motivation

如今对于不少大的数据集,咱们手动读它们并获得有意义的信息是很困难的。例如你的网站日志,一个用户每次登录,键入查询,或执行其余明显的动做,要想靠阅读上TB的文本监控这个用户实时的行为是极其困难的。若是按天天的小时分组,计算每组记录的数量,你将会描绘出数量的直方图,并识别网站的活跃时间。类似的,若是把广告按类型分组,你将会把广告推向更好的市场定位。也许你会基于在一天有效的时间投放循环广告。全部这种类型的问题均可以用数值聚合解决。app

Applicability

数值聚合的使用需知足如下两个条件:框架

一、  处理数值类型数据或作计数。less

二、  数据能根据指定字段分组。oop

 

Structure

图2-1展现了MapReduce中数值聚合执行的结构图。MapReduce组件每部分都有详细的描述:post

 

Figure 2-1. The structure of the numerical summarizations pattern

 

•mapper的输出keys由分组的字段组成,values是任意相关数值型的条目。能够假设mapper配置一张关系表,表的列跟要执行θ方法的字段关联,而且每一行都包含mapper输出的记录。Mapper输出的value包含每一列的值,输出key将表做为一个总体,由于每一个表都是由MapReduce的分组方法建立的。

 

Notice:分组会涉及到将大量子数据集发送到到要运行的reduce端,每一个输入记录都有可能成为map的输出。确保尽可能少的须要分析的数据发送到reduce端,而且处理好坏的输入条件。

 

•combiner经过数值聚合能有效减小经过网络传给reduce的中间键值对的数目。若是θ方法是关联的而且是可交换的就能达到目的。就是说,若是能任意改变值得顺序和进行任意的分组计算而对最终结果无影响,就能够用combiner。这样的combiner在下面的部分会论述。

 

•作数值求和时能从自定义partitioner中更好的向若干reduce任务分发键值对受益。这种需求不多,一旦有job执行时间吃紧,数据量庞大,并且有严重数据倾斜时,它能发挥做用。

 

Notice:自定义的partitioner常常被人们忽略,可是,花时间理解基于此的作分组时输出键的分布和分区会提升性能(还有其它这种状况的)。假如启动一百个reduce任务,80个用30秒完成,其它的用25分钟,这是很低效的。

 

•reducer接收一系列与根据key分组的记录相关联的数值型values(v1, v2, v3,…, vn),执行方法λ =θ(v1, v2, v3, …, vn).λ的值同给定的输入key一块输出。

 

Consequences

Job的输出会由每一个reducer输入组生成的包含一条记录的多个文件组成。每条记录包含key和聚合值。

 

Known uses

Word count:

就是MapReduce的hello world程序。程序对每一个单词先输出key为单词,value为整数1,而后根据key分组。Reduce阶段输出每一个惟一单词和整数加起来的和。第一章能够看到这个例子。

Record count:

一种经常使用的根据特定时间周期(周,日,时等)获取数据流量规律的分析方法。

Min/max/count:

一种计算最小,最大值,或特定事件总和的分析。例如,用户第一次发帖时间,最后一次发帖时间,和一段时间内发帖的总数。你没必要一次计算出这三个聚合值,其余使用案例也列在这了,若是仅对其中某个感兴趣。

Average/Median/Standard deviation:

跟最大最小求和类似,但不是一种简单的实现,由于操做是不相关的。三个均可以用combiner,但相比单纯重复reduce的逻辑,它们须要一种更复杂的处理过程。

 

Resemblances

SQL:

数值聚合模式跟sql里分组后再聚合类似:

SELECT MIN(numericalcol1), MAX(numericalcol1), COUNT(*) FROM TABLE GROUP BY groupcol2;

Pig:

Group by部分用foreach generate替换:b = GROUP a BY groupcol2;

c = FOR EACH b GENERATE group, MIN(a.numericalcol1),

MAX(a.numericalcol1), COUNT_STAR(a);

 

Performance analysis

若是combiner适当的运用,使用这种模式能让聚合运算可以执行的很好。MapReduce就是为这些种类的工做出现的。跟书中大多数模式同样,开发者须要关注使用适当的reduce的个数而且考虑可能在reduce组里出现的数据倾斜。就是说,若是一个key产生的中间键值对比其余key多,这个key对应的reducer就会比其余reducer执行更多的工做。

 

Numerical Summarization Examples

Minimum, maximum, and count example

这三种计算都是数值聚合模式的优秀的程序。分组操做之后,reducer端只须要迭代跟分组相关联的值并找到最小,最大和每一个key分组的和。因为关联性和可互换性,combiner能极大得减小须要发送的reduce端shffled的中间键值对。若是实现的功能恰当,reducer的代码能够跟combiner一致。

 

下面每部分代码描述了这种问题的情形。

Problem:给出用户评论内容的列表,获得第一次和最后一次评论时间,和这个用户评论总条数。

 

Minmaxcounttuple code。

MinMaxCountTuple类有三个属性,并实现writable接口,用于mapper的输出值。当用分隔符把这些值放进一个Text对象,最好建立个自定义的writable。这样不只整洁,也没必要担忧从reduce阶段获取这些值是的字符串解析。这种自定义writable对象也广泛用于这种模式下的其余例子。下面就是代码,本章其它writables跟这个相似,为了简介,咱们会省略掉。

 

import org.apache.hadoop.io.Writable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.Date;

public class MinMaxCountTuple implements Writable {
    private Date min = new Date();
    private Date max = new Date();
    private long count = 0;

    private final static SimpleDateFormat frmt = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS");

    public Date getMin() {
        return min;
    }

    public void setMin(Date min) {
        this.min = min;
    }

    public Date getMax() {
        return max;
    }

    public void setMax(Date max) {
        this.max = max;
    }

    public long getCount() {
        return count;
    }

    public void setCount(long count) {
        this.count = count;
    }

    public void readFields(DataInput in) throws IOException {
        // Read the data out in the order it is written,
        // creating new Date objects from the UNIX timestamp
        min = new Date(in.readLong());
        max = new Date(in.readLong());
        count = in.readLong();
    }

    public void write(DataOutput out) throws IOException {
        // Write the data out in the order it is read,
        // using the UNIX timestamp to represent the Date
        out.writeLong(min.getTime());
        out.writeLong(max.getTime());
        out.writeLong(count);
    }

    public String toString() {
        return frmt.format(min) + "\t" + frmt.format(max) + "\t" + count;
    }

  

Mapper code。Mapper会从每行输入记录(用户id和建立数据)中抽取的xml属性做为输入值,执行预处理。输入键忽略掉,建立数据为了在combiner和reduce中容易比较而转换成java date类型。输出键是用户id,值是将要输出的三个列:最小日期,最大日期,和用户评论的总条数。三个列存在writable类型对象里,前两个时间类型,最后一个long类型。这些对reducer来讲很精确,但不会影响到mapper中的使用,咱们也但愿在mapper和reducer中使用相同的数据类型。在mapper中,设置最小最大建立日期。为了充分发挥随后讲到的combiner的优点,日期输出两次。第三列给计数值1,代表这个用户提交了一条评论。事实上,在reducer阶段,全部的计数会被加到一块儿,也会算出最大最小日期。

 

    public static class MinMaxCountMapper extends Mapper<Object, Text, Text, MinMaxCountTuple> {
        // Our output key and value Writables
        private Text outUserId = new Text();
        private MinMaxCountTuple outTuple = new MinMaxCountTuple();
        // This object will format the creation date string into a Date object
        private final static SimpleDateFormat frmt = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS");

        public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
            Map<String, String> parsed = MRDPUtils.transformXmlToMap(value.toString());
            // Grab the "CreationDate" field since it is what we are finding
            // the min and max value of
            String strDate = parsed.get("CreationDate");
            // Grab the“UserID” since it is what we are grouping by
            String userId = parsed.get("UserId");
            // Parse the string into a Date object
            Date creationDate = null;
            try {
                creationDate = frmt.parse(strDate);
            } catch (ParseException e) {
                e.printStackTrace();
            }
            // Set the minimum and maximum date values to the creationDate
            outTuple.setMin(creationDate);
            outTuple.setMax(creationDate);
            // Set the comment count to 1
            outTuple.setCount(1);
            // Set our user ID as the output key
            outUserId.set(userId);
            // Write out the hour and the average comment length
            context.write(outUserId, outTuple);
        }
    }

  

Reducer code。Reducer会迭代全部值找出最小,最大日期和统计总和。一开始咱们对每一个输入组初始化要输出的结果。对组内的每一个输入值,若是输出结果的最小值没设定,或比当前输出结果中存的当前最小值小,咱们就把这个输入值设置为输出结果的最小值。最大值的逻辑也是这样,惟一不一样的是用了大于号。每一个值的计数值被加到sum和中,跟word count程序类似。经过全部输入值算出最大最小值之后,最终的计数就是要输出的值。键和值被写到文件系统中。

 

    public static class MinMaxCountReducer extends Reducer<Text, MinMaxCountTuple, Text, MinMaxCountTuple> {
        // Our output value Writable
        private MinMaxCountTuple result = new MinMaxCountTuple();

        public void reduce(Text key, Iterable<MinMaxCountTuple> values, Context context) throws IOException, InterruptedException {
            // Initialize our result
            result.setMin(null);
            result.setMax(null);
            result.setCount(0);
            int sum = 0;
            // Iterate through all input values for this key
            for (MinMaxCountTuple val : values) {
                // If the value's min is less than the result's min
                // Set the result's min to value's
                if (result.getMin() == null || val.getMin().compareTo(result.getMin()) < 0) {
                    result.setMin(val.getMin());
                }

                // If the value's max is more than the result's max
                // Set the result's max to value's
                if (result.getMax() == null || val.getMax().compareTo(result.getMax()) > 0) {
                    result.setMax(val.getMax());
                }
                // Add to our sum the count for value
                sum += val.getCount();
            }
            // Set our count to the number of input values
            result.setCount(sum);
            context.write(key, result);
        }
    }

  

Combiner optimization。本例的Reducer正好能够用做job的combiner。由于咱们仅仅关心记录条数,最小时间,最大时间。同一个用户的多条记录没必要都发送到reducer。在本地map任务上能够先算出最大最小评论日期,这样对最终的值是没有影响的。当计数操做是关联的,并是可交换的时,combiner的使用不会影响计数结果。

 

Data flow diagram。图2-2给出了mapper,combiner,reducer之间的流程来帮助咱们描述他们之间的交互过程。用数值简单的表明日期,概念上是同样的。Combiner极可能执行mapper输出的全部组,决定最大最小值做为前两个列,并分组求和。而后输出最小最大值,和这个心的计数和。若是combiner没有运行在任何记录上,在reducer里仍然是能够计算的。

 

 

Figure 2-2. The Min/Max/Count MapReduce data flow through the combiner

 

Average example

计算平均值,假设在分组里面须要两个值:须要要求和的值的个数和值的总和。这两个值能在reduce端细致的计算。经过遍历集合中的每一个值,累加到一个保存总和的变量里。以后,简单的经过计数划分结果,并输出平均值。然而,在这里咱们不能用reduce的实现当作combiner,由于平均值的计算是非关联操做。相反,Mapper要输出两列数据,数值个数和平均值。每条输入记录计数1。Reduce经过计数和平均值的成绩得到总和,累加计数做为总的数值个数和。这样经过动态的计数计算动态的数值和,而后输出计数和平均值。使用这种迂回策略,reducer代码就能用做combiner,由于相关性获得了保存。

 

下面代码描述了这种问题。

问题:给出用户评论数据,计算一天内每一个小时评论的长度的平均值。

 

Mapper code。Mapper将会处理每条输入记录并计算基于时间的评论内容的平均长度。输出键是小时,从xml数据文件属性中可获得。输出值有两列,评论的条数和这一小时内评论的平均长度。由于mapper每次处理一条记录,计数为1,平均长度就是这条评论的长度。这两个值经过自定义的writable类输出,这个类包含两个float数值字段,一个计数字段,还有一个平均值。

 

    public static class AverageMapper extends Mapper<Object, Text, IntWritable, CountAverageTuple> {
        private IntWritable outHour = new IntWritable();
        private CountAverageTuple outCountAverage = new CountAverageTuple();
        private final static SimpleDateFormat frmt = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS");

        public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
            Map<String, String> parsed = MRDPUtils.transformXmlToMap(value.toString());
            // Grab the "CreationDate" field,
            // since it is what we are grouping by
            String strDate = parsed.get("CreationDate");
            // Grab the comment to find the length
            String text = parsed.get("Text");
            // get the hour this comment was posted in
            Date creationDate = null;
            try {
                creationDate = frmt.parse(strDate);
            } catch (ParseException e) {
                e.printStackTrace();
            }
            outHour.set(creationDate.getHours());
            // get the comment length
            outCountAverage.setCount(1);
            outCountAverage.setAverage(text.length());
            // write out the hour with the comment length
            context.write(outHour, outCountAverage);
        }
    }

  

Reducer code。Reducer代码迭代某小时内全部值并保存在两个本地变量:动态的count变量和动态的sum变量。对每一个值,count乘上平均值加到sum上。Count简单的加到动态count变量里。迭代以后,输入key,动态count,和经过动态count,动态sum计算出来的平均值写到文件中。

 

    public static class AverageReducer extends Reducer<IntWritable, CountAverageTuple, IntWritable, CountAverageTuple> {
        private CountAverageTuple result = new CountAverageTuple();

        public void reduce(IntWritable key, Iterable<CountAverageTuple> values, Context context) throws IOException, InterruptedException {
            double sum = 0;
            long count = 0;
            // Iterate through all input values for this key
            for (CountAverageTuple val : values) {
                sum += val.getCount() * val.getAverage();
                count += val.getCount();
            }
            result.setCount(count);
            result.setAverage(sum / count);
            context.write(key, result);
        }
    }

  

Combiner optimization。计算平均值时,当reduce代码输出计数和平均值时能够用做combiner。求平均值不是相关联的操做,若是count和平均值(原文为count,本人认为有误)从reducer一块输出,这两个值的乘积能够保存起来用于reduce阶段的sum。若是不输出这个count,combiner就不能用,由于平均数的平均数并非正确的平均数。通常来讲,count和平均值一块写到文件系统不会有问题。然而,若是count妨碍了接下来的分析,那就去掉count,编写一个跟reduce类似的combiner的实现。这两种实现的惟一区别是写不写count。

 

Data flow diagram。图2-3展现了流程图。

Figure 2-3. Data flow for the average example

相关文章
相关标签/搜索