Hive Join Strategies hive的链接策略

Common Join

最为普通的join策略,不受数据量的大小影响,也能够叫作reduce side join ,最没效率的一种join 方式. 它由一个mapreduce job 完成. 算法

首先将大表和小表分别进行map 操做, 在map shuffle 的阶段每个map output key 变成了table_name_tag_prefix + join_column_value , 可是在进行partition 的时候它仍然只使用join_column_value 进行hash. apache

每个reduce 接受全部的map 传过来的split , 在reducce 的shuffle 阶段,它将map output key 前面的table_name_tag_prefix 给舍弃掉进行比较. 由于reduce 的个数能够由小表的大小进行决定,因此对于每个节点的reduce 必定能够将小表的split 放入内存变成hashtable. 而后将大表的每一条记录进行一条一条的比较. 缓存

真正的Join在reduce阶段app

MapJoin

Map Join 的计算步骤分两步,将小表的数据变成hashtable广播到全部的map 端,将大表的数据进行合理的切分,而后在map 阶段的时候用大表的数据一行一行的去探测(probe) 小表的hashtable. 若是join key 相等,就写入HDFS. 分布式

map join 之因此叫作map join 是由于它全部的工做都在map 端进行计算. ide

hive 在map join 上作了几个优化: oop

hive 0.6 的时候默认认为写在select 后面的是大表,前面的是小表, 或者使用 /*+mapjoin(map_table) */ 提示进行设定. hive 0.7 的时候这个计算是自动化的,它首先会自动判断哪一个是小表,哪一个是大表,这个参数由(hive.auto.convert.join=true)来控制. 而后控制小表的大小由(hive.smalltable.filesize=25000000L)参数控制(默认是25M),当小表超过这个大小,hive 会默认转化成common join. 你能够查看HIVE-1642. 性能

首先小表的Map 阶段它会将本身转化成MapReduce Local Task ,而后从HDFS 取小表的全部数据,将本身转化成Hashtable file 并压缩打包放入DistributedCache 里面. 优化

目前hive 的map join 有几个限制,一个是它打算用BloomFilter 来实现hashtable , BloomFilter 大概比hashtable 省8-10倍的内存, 可是BloomFilter 的大小比较难控制. ui

如今DistributedCache 里面hashtable默认的复制是3份,对于一个有1000个map 的大表来讲,这个数字过小,大多数map 操做都等着DistributedCache 复制.

优化后的map-join

Converting Common Join into Map Join

判断谁是大表谁是小表(小表的标准就是size小于hive.mapjoin.smalltable.filesize的值)

Hive在Compile阶段的时候对每个common join会生成一个conditional task,而且对于每个join table,会假设这个table是大表,生成一个mapjoin task,而后把这些mapjoin tasks装进

conditional task(List<Task<? extends Serializable>> resTasks),同时会映射大表的alias和对应的mapjoin task。在runtime运行时,resolver会读取每一个table alias对应的input file size,若是小表的file size比设定的threshold要低 (hive.mapjoin.smalltable.filesize,默认值为25M),那么就会执行converted mapjoin task。对于每个mapjoin task同时会设置一个backup task,就是先前的common join task,一旦mapjoin task执行失败了,则会启用backup task

Performance Bottleneck

性能瓶颈

一、Distributed Cache is the potential performance bottleneck

分布式缓存是一个潜在的性能瓶颈

A、Large hashtable file will slow down the propagation of Distributed Cache

大的hashtable文件将会减速分布式缓存的传播

B、Mappers are waiting for the hashtables file from Distributed Cache

Mapper排队等待从分布式缓存获取hashtables(由于默认一个hashtable缓存是三份,若是mappers数量太多须要一个一个的等待)

二、Compress and archive all the hashtable file into a tar file.

压缩和归档全部的hashtable文件为一个tar文件。

Bucket Map Join

Why:

Total table/partition size is big, not good for mapjoin.

How:

set hive.optimize.bucketmapjoin = true;

1. Work together with map join

2. All join tables are bucketized, and each small tableʼs bucket number can be divided by big tableʼs bucket number.

全部join的表是bucketized而且小表的bucket数量是大表bucket数量的整数倍

3. Bucket columns == Join columns

hive 建表的时候支持hash 分区经过指定clustered by (col_name,xxx ) into number_buckets buckets 关键字.

当链接的两个表的join key 就是bucket column 的时候,就能够经过

hive.optimize.bucketmapjoin= true

来控制hive 执行bucket map join 了, 须要注意的是你的小表的number_buckets 必须是大表的倍数. 不管多少个表进行链接这个条件都必须知足.(其实若是都按照2的指数倍来分bucket, 大表也能够是小表的倍数,不过这中间须要多计算一次,对int 有效,long 和string 不清楚)

Bucket Map Join 执行计划分两步,第一步先将小表作map 操做变成hashtable 而后广播到全部大表的map端,大表的map端接受了number_buckets 个小表的hashtable并不须要合成一个大的hashtable,直接能够进行map 操做,map 操做会产生number_buckets 个split,每一个split 的标记跟小表的hashtable 标记是同样的, 在执行projection 操做的时候,只须要将小表的一个hashtable 放入内存便可,而后将大表的对应的split 拿出来进行判断,因此其内存限制为小表中最大的那个hashtable 的大小.

Bucket Map Join 同时也是Map Side Join 的一种实现,全部计算都在Map 端完成,没有Reduce 的都被叫作Map Side Join ,Bucket 只是hive 的一种hash partition 的实现,另一种固然是值分区.

create table a (xxx) partition by (col_name)

不过通常hive 中两个表不必定会有同一个partition key, 即便有也不必定会是join key. 因此hive 没有这种基于值的map side join, hive 中的list partition 主要是用来过滤数据的而不是分区. 两个主要参数为(hive.optimize.cp = true 和 hive.optimize.pruner=true)

hadoop 源代码中默认提供map side join 的实现, 你能够在hadoop 源码的src/contrib/data_join/src 目录下找到相关的几个类. 其中TaggedMapOutput 便可以用来实现hash 也能够实现list , 看你本身决定怎么分区. Hadoop Definitive Guide 第8章关于map side join 和side data distribution 章节也有一个例子示例怎样实现值分区的map side join.

上图解释:b表是大表,a,c是小表而且都是整数倍,将a,c表加入内存先join而后到每一个b表的map去作匹配。

Sort Merge Bucket Map Join

Why:

No limit on file/partition/table size.

How:

set hive.optimize.bucketmapjoin = true;

set hive.optimize.bucketmapjoin.sortedmerge = true;

set hive.input.format=org.apache.hadoop.hive.ql.io.BucketizedHiveInputFormat;

1.Work together with bucket map join

将bucket加入到map join中

2.Bucket columns == Join columns == sort columns

Bucket Map Join 并无解决map join 在小表必须彻底装载进内存的限制, 若是想要在一个reduce 节点的大表和小表都不用装载进内存,必须使两个表都在join key 上有序才行,你能够在建表的时候就指定sorted byjoin key 或者使用index 的方式.

作法仍是两边要作hash bucket,并且每一个bucket内部要进行排序。这样一来当两边bucket要作局部join的时候,只须要用相似merge sort算法中的merge操做同样把两个bucket顺序遍历一遍便可完成,这样甚至都不用把一个bucket完整的加载成hashtable,这对性能的提高会有很大帮助。

set hive.optimize.bucketmapjoin = true;

set hive.optimize.bucketmapjoin.sortedmerge = true;

set hive.input.format=org.apache.hadoop.hive.ql.io.BucketizedHiveInputFormat;

Bucket columns == Join columns == sort columns

这样小表的数据能够每次只读取一部分,而后仍是用大表一行一行的去匹配,这样的join 没有限制内存的大小. 而且也能够执行全外链接.

Skew Join

Join bottlenecked on the reducer who gets the

skewed key

set hive.optimize.skewjoin = true;

set hive.skewjoin.key = skew_key_threshold

相关文章
相关标签/搜索