[大牛翻译系列]Hadoop(1)MapReduce 链接:重分区链接(Repartition join)

4.1 链接(Join)

链接是关系运算,能够用于合并关系(relation)。对于数据库中的表链接操做,可能已经广为人知了。在MapReduce中,链接能够用于合并两个或多个数据集。例如,用户基本信息和用户活动详情信息。用户基本信息来自于OLTP数据库。用户活动详情信息来自于日志文件。html

MapReduce的链接操做能够用于如下场景:数据库

  • 用户的人口统计信息的聚合操做(例如:青少年和中年人的习惯差别)。
  • 当用户超过必定时间没有使用网站后,发邮件提醒他们。(这个必定时间的阈值是用户本身预约义的)
  • 分析用户的浏览习惯。让系统能够基于这个分析提示用户有哪些网站特性尚未使用到。进而造成一个反馈循环。

全部这些场景都要求将多个数据集链接起来。apache

最经常使用的两个链接类型是内链接(inner join)和外链接(outer join)。以下图所示,内链接比较两个关系中全部的元组,判断是否知足链接条件,而后生成一个知足链接条件的结果集。与内链接相反的是,外链接并不须要两个关系的元组都知足链接条件。在链接条件不知足的时候,外链接能够将其中一方的数据保留在结果集中。数组

 

 

为了实现内链接和外链接,MapReduce中有三种链接策略,以下所示。这三种链接策略有的在map阶段,有的在reduce阶段。它们都针对MapReduce的排序-合并(sort-merge)的架构进行了优化。缓存

  1. 重分区链接(Repartition join)—— reduce端链接。使用场景:链接两个或多个大型数据集。
  2. 复制链接(Replication join)—— map端链接。使用场景:待链接的数据集中有一个数据集足够小到能够彻底放在缓存中。
  3. 半链接(Semi-join)—— 另外一个map端链接。使用场景:待链接的数据集中有一个数据集很是大,但同时这个数据集能够被过滤成小到能够放在缓存中。

在介绍完这些链接策略以后,还会介绍另外一个策略:决策树。能够根据实际状况选择最优策略。网络

 

4.1.1 重分区链接(Repartition join)

重分区链接是reduce端链接。它利用MapReduce的排序-合并机制来分组数据。它只使用一个单独的MapReduce任务,并支持多路链接(N-way join)。多路指的是多个数据集。架构

Map阶段负责从多个数据集中读取数据,决定每一个数据的链接值,将链接值做为输出键(output key)。输出值(output value)则包含将在reduce阶段被合并的值。app

Reduce阶段,一个reduce接收map函数传来的每个输出键的全部输出值,并将数据分为多个分区。在此以后,reduce对全部的分区进行笛卡尔积(Cartersian product)链接运算,并生成所有的结果集。框架

以上MapReduce过程如图4.2所示:ide

 

 

注:过滤(filtering)和投影(projection)

在MapReduce重分区链接中,最好可以减小map阶段传输到reduce阶段的数据量。由于经过网络在这两个阶段中排序和传输数据会产生很高的成本。若是不能避免reduce端的工做,那么一个最佳实践就是尽量在map阶段多过滤数据和投影。过滤指的是将map极端的输入数据中不须要的部分丢弃。投影是关系代数的概念。投影用于减小发送给reduce的字段。例如:在分析用户数据的时候,若是只关注用户的年龄,那么在map任务中应该只投影(或输出)年龄字段,不考虑用户的其余的字段。

 

技术19:优化重分区链接

《Hadoop in Action》给出了一个例子,说明如何使用Hadoop的社区包(contrib package)org.apache.hadoop.contrib.utils.join实现重分区链接。这个贡献包打包了全部的处理细节,仅仅须要实现一个很是简单的方法。

然而,这个社区包对重分区的实现方法的空间效率低下。它须要将待链接的全部输出值都读取到内存中,而后进行多路链接(multiway join)。实际上,若是仅仅将小数据集读取到内存中,而后用小数据集遍历大数据集来进行链接,这样将更加高效。

 

问题

须要在MapReduce中进行重分区链接,可是不但愿在reduce阶段将全部的数据都放到缓存中。

 

解决方案

这个技术运用了优化后的重分区框架。它仅仅将一个待链接的数据集放在缓存中,减小了reduce须要放在缓存中的数据。

 

讨论

附录D.1(http://www.cnblogs.com/datacloud/p/3617079.html)中介绍了优化后的重分区框架的实现。这个实现是根据org.apache.hadoop.contrib.utils.join社区包进行建模。这个优化后的框架仅仅缓存两个数据集中比较小的那一个,以减小内存消耗。图4.3是优化后的重分区链接的流程图:

 

 

 

图4.4是实现的类图。类图中包含两个部分,一个通用框架和一些类的实现样例。

 

 

使用这个链接框架须要实现抽象类OptimizedDataJoinMapperBase和OptimizedDataJoinReducerBase。

例如,须要链接用户详情数据和用户活动日志。第一步,判断两个数据集中那一个比较小。对于通常的网站来讲,用户详情数据会比较小,用户活动日志会比较大。

在以下示例中,用户数据中有用户姓名,年龄和所在州

 

$ cat test-data/ch4/users.txt
anne 22 NY
joe 39 CO
alison 35 NY
mike 69 VA
marie 27 OR
jim 21 OR
bob 71 CA
mary 53 NY
dave 36 VA
dude 50 CA

 

用户活动日志中有用户姓名,进行的动做,来源IP。这个文件通常都要比用户数据要大得多。

 

$ cat test-data/ch4/user-logs.txt
jim logout 93.24.237.12
mike new_tweet 87.124.79.252
bob new_tweet 58.133.120.100
mike logout 55.237.104.36
jim new_tweet 93.24.237.12
marie view_user 122.158.130.90

 

首先,必须实现抽象类OptimizedDataJoinMapperBase。这个将在map端被调用。这个类将建立map的输出键和输出值。同时,它还将提示整个框架,当前处理的文件是否是比较小的那个。

 

 1 public class SampleMap extends OptimizedDataJoinMapperBase {
 2 
 3   private boolean smaller;
 4 
 5   @Override
 6   protected Text generateInputTag(String inputFile) {
 7     // tag the row with input file name (data source)
 8     smaller = inputFile.contains("users.txt");
 9     return new Text(inputFile);
10   }
11 
12   @Override
13   protected String genGroupKey(Object key, OutputValue output) {
14     return key.toString();
15   }
16 
17   @Override
18   protected boolean isInputSmaller(String inputFile) {
19     return smaller;
20   }
21 
22   @Override
23   protected OutputValue genMapOutputValue(Object o) {
24     return new TextTaggedOutputValue((Text) o);
25   }
26 }

 

下一步,你须要实现抽象类 OptimizedDataJoinReducerBase。它将在reduce端被调用。在这个类中,将从map端传入不一样数据集的输出键和输出值,而后返回reduce端的输出数组。

 

 1 public class SampleReduce extends OptimizedDataJoinReducerBase {
 2 
 3   private TextTaggedOutputValue output = new TextTaggedOutputValue();
 4   private Text textOutput = new Text();
 5 
 6   @Override
 7   protected OutputValue combine(String key,
 8                                 OutputValue smallValue,
 9                                 OutputValue largeValue) {
10     if(smallValue == null || largeValue == null) {
11       return null;
12     }
13     Object[] values = {
14         smallValue.getData(), largeValue.getData()
15     };
16     textOutput.set(StringUtils.join(values, "\t"));
17     output.setData(textOutput);
18     return output;
19   }

 

最后,任务的主代码(driver code)须要指明InputFormat类,并设置次排序(Secondary sort)。

 

 1     job.setInputFormat(KeyValueTextInputFormat.class);
 2 
 3     job.setMapOutputKeyClass(CompositeKey.class);
 4     job.setMapOutputValueClass(TextTaggedOutputValue.class);
 5     job.setOutputKeyClass(Text.class);
 6     job.setOutputValueClass(Text.class);
 7 
 8     job.setPartitionerClass(CompositeKeyPartitioner.class);
 9     job.setOutputKeyComparatorClass(CompositeKeyComparator.class);
10     job.setOutputValueGroupingComparator(CompositeKeyOnlyComparator.class);

 

如今链接的准备工做就作完了,能够开始运行链接:

 

$ hadoop fs -put test-data/ch4/users.txt users.txt
$ hadoop fs -put test-data/ch4/user-logs.txt user-logs.txt
$ bin/run.sh com.manning.hip.ch4.joins.improved.SampleMain users.txt,user-logs.txt output
$ hadoop fs -cat output/part*
bob 71 CA new_tweet 58.133.120.100
jim 21 OR logout 93.24.237.12
jim 21 OR new_tweet 93.24.237.12
jim 21 OR login 198.184.237.49
marie 27 OR login 58.133.120.100
marie 27 OR view_user 122.158.130.90
mike 69 VA new_tweet 87.124.79.252
mike 69 VA logout 55.237.104.36

 

若是和链接的源文件相对比,能够看到由于实现了一个内链接,输出中不包括用户anne,alison等不存在于日志文件中的记录。

 

小结:

这个链接的实现经过只缓存比较小的数据集来提升来Hadoop社区包的效率。可是,当数据从map阶段传输到reduce阶段的时候,仍然产生了很高的网络成本。

此外,Hadoop社区包支持多路链接,这里的实现只支持二路链接。

若是要更多地减小reduce端链接的内存足迹(memory footprint),一个简单的机制是在map函数中更多地进行投影操做。投影减小了map阶段的输出中的字段。例如:在分析用户数据的时候,若是只关注用户的年龄,那么在map任务中应该只投影(或输出)年龄字段,不考虑用户的其余的字段。这样就减小了map和reduce之间的网络负担,也减小了reduce在链接时的内存消耗。

和原始的社区包同样,这里的重分区的实现也支持过滤和投影。经过容许genMapOutputValue方法返回空值,就能够支持过滤。经过在genMapOutputValue方法中定义输出值的内容,就能够支持投影。

若是你既想输出全部的数据到reduce,又想避免排序的损耗,就须要考虑另外两种链接策略,复制链接和半链接。

相关文章
相关标签/搜索