Hive中的join操做原理和优化

2018-12-27 10:00:12数据库

Hive是基于Hadoop平台的,它提供了相似SQL同样的查询语言HQL。有了Hive,若是使用过SQL语言,而且不理解Hadoop MapReduce运行原理,也就没法经过编程来实现MR,可是你仍然能够很容易地编写出特定查询分析的HQL语句,经过使用相似SQL的语法,将HQL查询语句提交Hive系统执行查询分析,最终Hive会帮你转换成底层Hadoop可以理解的MR Job。
对于最基本的HQL查询咱们再也不累述,这里主要说明Hive中进行统计分析时使用到的JOIN操做。在说明Hive JOIN以前,咱们先简单说明一下,Hadoop执行MR Job的基本过程(运行机制),能更好的帮助咱们理解HQL转换到底层的MR Job后是如何执行的。咱们重点说明MapReduce执行过程当中,从Map端到Reduce端这个过程(Shuffle)的执行状况,如图所示(来自《Hadoop: The Definitive Guide》):
hadoop-mr-shuffle
基本执行过程,描述以下:apache

  1. 一个InputSplit输入到map,会运行咱们实现的Mapper的处理逻辑,对数据进行映射操做。
  2. map输出时,会首先将输出中间结果写入到map自带的buffer中(buffer默认大小为100M,能够经过io.sort.mb配置)。
  3. map自带的buffer使用容量达到必定门限(默认0.80或80%,能够经过io.sort.spill.percent配置),一个后台线程会准备将buffer中的数据写入到磁盘。
  4. 这个后台线程在将buffer中数据写入磁盘以前,会首先将buffer中的数据进行partition(分区,partition数为Reducer的个数),对于每一个的数据会基于Key进行一个in-memory排序。
  5. 排序后,会检查是否配置了Combiner,若是配置了则直接做用到已排序的每一个partition的数据上,对map输出进行化简压缩(这样写入磁盘的数据量就会减小,下降I/O操做开销)。
  6. 如今能够将通过处理的buffer中的数据写入磁盘,生成一个文件(每次buffer容量达到设置的门限,都会对应着一个写入到磁盘的文件)。
  7. map任务结束以前,会对输出的多个文件进行合并操做,合并成一个文件(若map输出至少3个文件,在多个文件合并后写入以前,若是配置了Combiner,则会运行来化简压缩输出的数据,文件个数能够经过min.num.splits.for.combine配置;若是指定了压缩map输出,这里会根据配置对数据进行压缩写入磁盘),这个文件仍然保持partition和排序的状态。
  8. reduce阶段,每一个reduce任务开始从多个map上拷贝属于本身partition(map阶段已经作好partition,并且每一个reduce任务知道应该拷贝哪一个partition;拷贝过程是在不一样节点之间,Reducer上拷贝线程基于HTTP来经过网络传输数据)。
  9. 每一个reduce任务拷贝的map任务结果的指定partition,也是先将数据放入到自带的一个buffer中(buffer默认大小为Heap内存的70%,能够经过mapred.job.shuffle.input.buffer.percent配置),若是配置了map结果进行压缩,则这时要先将数据解压缩后放入buffer中。
  10. reduce自带的buffer使用容量达到必定门限(默认0.66或66%,能够经过mapred.job.shuffle.merge.percent配置),或者buffer中存放的map的输出的数量达到必定门限(默认1000,能够经过mapred.inmem.merge.threshold配置),buffer中的数据将会被写入到磁盘中。
  11. 在将buffer中多个map输出合并写入磁盘以前,若是设置了Combiner,则会化简压缩合并的map输出。
  12. 当属于该reducer的map输出所有拷贝完成,则会在reducer上生成多个文件,这时开始执行合并操做,并保持每一个map输出数据中Key的有序性,将多个文件合并成一个文件(在reduce端可能存在buffer和磁盘上都有数据的状况,这样在buffer中的数据能够减小必定量的I/O写入操做开销)。
  13. 最后,执行reduce阶段,运行咱们实现的Reducer中化简逻辑,最终将结果直接输出到HDFS中(由于Reducer运行在DataNode上,输出结果的第一个replica直接在存储在本地节点上)。

经过上面的描述咱们看到,在MR执行过程当中,存在Shuffle过程的MR须要在网络中的节点之间(Mapper节点和Reducer节点)拷贝数据,若是传输的数据量很大会形成必定的网络开销。并且,Map端和Reduce端都会经过一个特定的buffer来在内存中临时缓存数据,若是没法根据实际应用场景中数据的规模来使用Hive,尤为是执行表的JOIN操做,有可能很浪费资源,下降了系统处理任务的效率,还可能由于内存不足形成OOME问题,致使计算任务失败。
下面,咱们说明Hive中的JOIN操做,针对不一样的JOIN方式,应该如何来实现和优化:编程

生成一个MR Job缓存

多表链接,若是多个表中每一个表都使用同一个列进行链接(出如今JOIN子句中),则只会生成一个MR Job,例如:网络

1 SELECT a.val, b.val, c.val FROM JOIN ON (a.key = b.key1) JOIN ON (c.key = b.key1)

三个表a、b、c都分别使用了同一个字段进行链接,亦即同一个字段同时出如今两个JOIN子句中,从而只生成一个MR Job。app

生成多个MR Jobide

多表链接,若是多表中,其中存在一个表使用了至少2个字段进行链接(同一个表的至少2个列出如今JOIN子句中),则会至少生成2个MR Job,例如:oop

1 SELECT a.val, b.val, c.val FROM JOIN ON (a.key = b.key1) JOIN ON (c.key = b.key2)

三个表基于2个字段进行链接,这两个字段b.key1和b.key2同时出如今b表中。链接的过程是这样的:首先a和b表基于a.key和b.key1进行链接,对应着第一个MR Job;表a和b链接的结果,再和c进行链接,对应着第二个MR Job。优化

表链接顺序优化ui

多表链接,会转换成多个MR Job,每个MR Job在Hive中称为JOIN阶段(Stage)。在每个Stage,按照JOIN顺序中的最后一个表应该尽可能是大表,由于JOIN前一阶段生成的数据会存在于Reducer的buffer中,经过stream最后面的表,直接从Reducer的buffer中读取已经缓冲的中间结果数据(这个中间结果数据多是JOIN顺序中,前面表链接的结果的Key,数据量相对较小,内存开销就小),这样,与后面的大表进行链接时,只须要从buffer中读取缓存的Key,与大表中的指定Key进行链接,速度会更快,也可能避免内存缓冲区溢出。例如:

1 SELECT a.val, b.val, c.val FROM JOIN ON (a.key = b.key1) JOIN ON (c.key = b.key1)

这个JOIN语句,会生成一个MR Job,在选择JOIN顺序的时候,数据量相比应该是b < c,表a和b基于a.key = b.key1进行链接,获得的结果(基于a和b进行链接的Key)会在Reducer上缓存在buffer中,在与c进行链接时,从buffer中读取Key(a.key=b.key1)来与表c的c.key进行链接。
另外,也能够经过给出一些Hint信息来启发JOIN操做,这指定了将哪一个表做为大表,从而获得优化。例如:

1 SELECT /*+ STREAMTABLE(a) */ a.val, b.val, c.val FROM JOIN ON (a.key = b.key1) JOIN ON(c.key = b.key1)

上述JOIN语句中,a表被视为大表,则首先会对表b和c进行JOIN,而后再将获得的结果与表a进行JOIN。

基于条件的LEFT OUTER JOIN优化

左链接时,左表中出现的JOIN字段都保留,右表没有链接上的都为空。对于带WHERE条件的JOIN语句,例如:

1 SELECT a.val, b.val FROM LEFT OUTER JOIN ON (a.key=b.key)
2 WHERE a.ds='2009-07-07' AND b.ds='2009-07-07'

执行顺序是,首先完成2表JOIN,而后再经过WHERE条件进行过滤,这样在JOIN过程当中可能会输出大量结果,再对这些结果进行过滤,比较耗时。能够进行优化,将WHERE条件放在ON后,例如:

1 SELECT a.val, b.val FROM LEFT OUTER JOIN b
2 ON (a.key=b.key AND b.ds='2009-07-07' AND a.ds='2009-07-07')

这样,在JOIN的过程当中,就对不知足条件的记录进行了预先过滤,可能会有更好的表现。

左半链接(LEFT SEMI JOIN)

左半链接实现了相似IN/EXISTS的查询语义,使用关系数据库子查询的方式实现查询SQL,例如:

1 SELECT a.key, a.value FROM WHERE a.key IN (SELECT b.key FROM b);

使用Hive对应于以下语句:

1 SELECT a.key, a.val FROM LEFT SEMI JOIN ON (a.key = b.key)

须要注意的是,在LEFT SEMI JOIN中,表b只能出如今ON子句后面,不可以出如今SELECT和WHERE子句中。
关于子查询,这里提一下,Hive支持状况以下:

  • 在0.12版本,只支持FROM子句中的子查询;
  • 在0.13版本,也支持WHERE子句中的子查询。

Map Side JOIN

Map Side JOIN优化的出发点是,Map任务输出后,不须要将数据拷贝到Reducer节点,下降的数据在网络节点之间传输的开销。
多表链接,若是只有一个表比较大,其余表都很小,则JOIN操做会转换成一个只包含Map的Job,例如:

1 SELECT /*+ MAPJOIN(b) */ a.key, a.value FROM JOIN ON a.key = b.key

对于表a数据的每个Map,都可以彻底读取表b的数据。这里,表a与b不容许执行FULL OUTER JOIN、RIGHT OUTER JOIN。

BUCKET Map Side JOIN

咱们先看两个表a和b的DDL,表a为:

1 CREATE TABLE a(key INT, othera STRING)
2 CLUSTERED BY(keyINTO 4 BUCKETS
3 ROW FORMAT DELIMITED
4 FIELDS TERMINATED BY '\001'
5 COLLECTION ITEMS TERMINATED BY '\002'
6 MAP KEYS TERMINATED BY '\003'
7 STORED AS SEQUENCEFILE;

表b为:

1 CREATE TABLE b(key INT, otherb STRING)
2 CLUSTERED BY(keyINTO 32 BUCKETS
3 ROW FORMAT DELIMITED
4 FIELDS TERMINATED BY '\001'
5 COLLECTION ITEMS TERMINATED BY '\002'
6 MAP KEYS TERMINATED BY '\003'
7 STORED AS SEQUENCEFILE;

如今要基于a.key和b.key进行JOIN操做,此时JOIN列同时也是BUCKET列,JOIN语句以下:

1 SELECT /*+ MAPJOIN(b) */ a.key, a.value FROM JOIN ON a.key = b.key

而且表a有4个BUCKET,表b有32个BUCKET,默认状况下,对于表a的每个BUCKET,都会去获取表b中的每个BUCKET来进行JOIN,这回形成必定的开销,由于只有表b中知足JOIN条件的BUCKET才会真正与表a的BUCKET进行链接。
这种默认行为能够进行优化,经过改变默认JOIN行为,只须要设置变量:

1 set hive.optimize.bucketmapjoin = true

这样,JOIN的过程是,表a的BUCKET 1只会与表b中的BUCKET 1进行JOIN,而再也不考虑表b中的其余BUCKET 2~32。
若是上述表具备相同的BUCKET,如都是32个,并且仍是排序的,亦即,在表定义中在CLUSTERED BY(key)后面增长以下约束:

1 SORTED BY(key)

则上述JOIN语句会执行一个Sort-Merge-Bucket (SMB) JOIN,一样须要设置以下参数来改变默认行为,优化JOIN时只遍历相关的BUCKET便可:

1 set hive.input.format=org.apache.hadoop.hive.ql.io.BucketizedHiveInputFormat;
2 set hive.optimize.bucketmapjoin = true;
3 set hive.optimize.bucketmapjoin.sortedmerge = true;
相关文章
相关标签/搜索