SQL on Hadoop中用到的主要技术——MPP vs Runtime Framework

转载声明

本文转载自盘点SQL on Hadoop中用到的主要技术,我的以为该文章对于诸如Impala这样的MPP架构的SQL引擎和Runtime Framework架构的Hive/Spark SQL进行对比,感受总结的特别好,而且和本人最近的公司相近,学习转载之。node

自hive出现以后,通过几年的发展,SQL on Hadoop相关的系统已经百花齐放,速度愈来愈快,功能也愈来愈齐全。本文并非要去比较所谓“交互式查询哪家强”,而是试图梳理出一个统一的视角,来看看各家系统有哪些技术上相通之处。mysql

考虑到系统使用的普遍程度与成熟度,在具体举例时通常会拿Hive和Impala为例,固然在调研的过程当中也会涉及到一些其余系统,如Spark SQL,Presto,TAJO等。而对于hawq这样的商业产品和apache drill这样成熟度还不是很高的开源方案就不作过多了解了。git

系统架构

runtime framework v.s. mpp

在SQL on Hadoop系统中,有两种架构,一种是基于某个运行时框架来构建查询引擎,典型案例是Hive;另外一种是仿照过去关系数据库的MPP架构。前者现有运行时框架,而后套上sql层,后者则是从头打造一个一体化的查询引擎。有时咱们能听到一种声音,说后者的架构优于前者,至少在性能上。那么是否果然如此?github

通常来讲,对于SQL on Hadoop系统很重要的一个评价指标就是:快。后面提到的全部内容也大可能是为了查询速度更快。在Hive逐渐普及以后,就逐渐有了所谓交互式查询的需求,由于不管是BI系统,仍是adhoc,都不能按照离线那种节奏玩。这时候不管是有实力的大公司(好比Facebook),仍是专业的供应商(好比Cloudera),都试图去解决这个问题。短时间能够靠商业方案或者关系数据库去支撑一下,可是长远的解决方案就是参考过去的MPP数据库架构打造一个专门的系统,因而就有了Impala,Presto等等。从任务执行的角度说,这类引擎的任务执行其实跟DAG模型是相似的,当时也有Spark这个DAG模型的计算框架了,但这终究是别人家的孩子,并且往Spark上套sql又是Hive的那种玩法了。因而在Impala问世以后就强调本身“计算所有在内存中完成”,性能也是各类碾压当时还只有MR做为计算模型的Hive。那么Hive所表明的“基于已有的计算模型”方式是否真的不行?算法

不能否认,按照这种方式去比较,那么类MPP模式确实有不少优点:sql

  • DAG v.s. MR:最主要的优点,中间结果不写磁盘(除非内存不够),一鼓作气。
  • 流水线计算:上游stage一出结果立刻推送或者拉到下一个stage处理,好比多表join时前两个表有结果直接给第三个表,不像MR要等两个表彻底join完再给第三个表join。
  • 高效的IO:本地查询没有多余的消耗,充分利用磁盘。这个后面细说。
  • 线程级别的并发:相比之下MR每一个task要启动JVM,自己就有很大延迟,占用资源也多。

固然MPP模式也有其劣势,一个是扩展性不是很高,这在关系数据库时代就已经有过结论;另外一个是容错性差,对于Impala来讲一旦运行过程当中出点问题,整个查询就挂了。数据库

可是,通过不断的发展,Hive也能跑在DAG框架上了,不只有Tez,还有Spark。上面提到的一些劣势,其实大都也能够在计算模型中解决,只不过考虑到计算模型的通用性和自己的设计目标,不会去专门知足(因此若是从这个角度分类,Impala属于“专用系统”,Spark则属于“通用系统”)。在最近Cloudera作的benchmark中,虽然Impala仍然一路领先,可是基于Spark的Spark SQL彻底不逊色于Presto,基于Tez的Hive也不算不好,至少在多用户并发模式下能超过Presto,足见MPP模式并非绝对占上风的。因此这种架构上的区别在我看来并非制胜的关键,至少不是惟一的因素,真正要作到快速查询,各个方面的细节都要有所把握。后面说的都是这些细节。express

核心组件

无论是上面提到的那种架构,一个SQL on Hadoop系统通常都会有一些通用的核心组件,这些组件根据设计者的考虑放在不一样的节点角色中,在物理上节点都按照master/worker的方式去作,若是master压力太大,一些原本适合放在master上的组件能够放到一个辅助master上。apache

  • UI层负责提供用户输入查询的接口。通常有Web/GUI,命令行,编程方式3类。
  • QL层负责把用户提交的查询解析成能够运行的执行计划(好比MR Job)。这部分在后面会专门提到。
  • 执行层就是运行具体的Job。通常会有一个master负责query的运行管理,好比申请资源,观察进度等等,同时master也负责最终聚合局部结果到全局结果。而每一个节点上会有相应的worker作本地计算。
  • IO层提供与存储层交互的接口。对于HDFS来讲,须要根据I/O Format把文件转换成K/V,Serde再完成K/V到数据行的映射。对于非HDFS存储来讲就须要一些专门的handler/connector。
  • 存储层通常是HDFS,但也有能够查询NoSQL,或者关系数据库的。
  • 系统另外还须要一个元数据管理服务,管理表结构等。

这里写图片描述

执行计划

编译流程

从SQL到执行计划,大体分为5步。编程

  • 第一步将SQL转换成抽象语法树AST。这一步通常都有第三方工具库能够完成,好比antlr。
  • 第二步对AST进行语义分析,好比表是否存在,字段是否存在,SQL语义是否有误(好比select中被断定为聚合的字段在group by中有没有出现)。
  • 第三步生成逻辑执行计划,这是一个由逻辑操做符组成的DAG。好比对于Hive来讲扫表会产生TableScanOperator,聚合会产生GroupByOperator。对于类MPP系统来讲,状况稍微有点不一样。逻辑操做符的种类仍是差很少,可是会先生成单机版本,而后生成多机版本。多机版本主要是把aggregate,join,还有top n这几个操做并行化,好比aggregate会分红相似MR那样的本地aggregate,shuffle和全局aggregate三步。
  • 第四步作逻辑执行计划作优化,这步在下面单独介绍。
  • 第五步把逻辑执行计划转换成能够在机器上运行的物理计划。对于Hive来讲,就是MR/Tez Job等;对于Impala来讲,就是plan fragment。其余类MPP系统也是相似的概念。物理计划中的一个计算单元(或者说Job),有“输入,处理,输出”三要素组成,而逻辑执行计划中的operator相对粒度更细,一个逻辑操做符通常处于这三要素之一的角色。

下面分别举两个例子,直观的认识下sql、逻辑计划、物理计划之间的关系,具体解释各个operator的话会比较细碎,就不展开了。

Hive on MR

select count(1) from status_updates where ds = '2009-08-01'
  • 1
  • 2

这里写图片描述

Presto

引用自美团技术团队,其中SubPlan就是物理计划的一个计算单元

select c1.rank, count(*) 
from dim.city c1 join dim.city c2 on c1.id = c2.id 
where c1.id > 10 group by c1.rank limit 10;
  • 1
  • 2
  • 3
  • 4

这里写图片描述

优化器

关于执行计划的优化,虽然不必定是整个编译流程中最难的部分,但倒是最有看点的部分,并且目前还在不断发展中。Spark系之因此放弃Shark另起炉灶作Spark SQL,很大一部分缘由是想本身作优化策略,避免受Hive的限制,为此还专门独立出优化器组件Catalyst(固然Spark SQL目前仍是很是新,其将来发展给人很多想象空间)。总之这部分工做能够不断的创新,优化器越智能,越傻瓜化,用户就越能解放出来解决业务问题。

早期在Hive中只有一些简单的规则优化,好比谓词下推(把过滤条件尽量的放在table scan以后就完成),操做合并(连续的filter用and合并成一个operator,连续的projection也能够合并)。后来逐渐增长了一些略复杂的规则,好比相同key的join + group by合并为1个MR,还有star schema join。在Hive 0.12引入的相关性优化(correlation optimizer)算是规则优化的一个高峰,他可以减小数据的重复扫描,具体来讲,若是查询的两个部分用到了相同的数据,而且各自作group by / join的时候用到了相同的key,这个时候因为数据源和shuffle的key是同样的,因此能够把原来须要两个job分别处理的地方合成一个job处理。 
好比下面这个sql:

SELECT 
    sum(l_extendedprice) / 7.0 as avg_yearly 
FROM 
     (SELECT l_partkey, l_quantity, l_extendedprice 
      FROM lineitem JOIN part ON (p_partkey=l_partkey) 
      WHERE p_brand='Brand#35' AND p_container = 'MED PKG')touter 
JOIN 
     (SELECT l_partkey as lp, 0.2 * avg(l_quantity) as lq 
      FROM lineitem GROUP BY l_partkey) tinner 
ON (touter.l_partkey = tinnter.lp) 
WHERE touter.l_quantity < tinner.lq
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12

这个查询中两次出现lineitem表,group by和两处join用的都是l_partkey,因此原本两个子查询和一个join用到三个job,如今只须要用到一个job就能够完成。

这里写图片描述

可是,基于规则的优化(RBO)不能解决全部问题。在关系数据库中早有另外一种优化方式,也就是基于代价的优化CBO。CBO经过收集表的数据信息(好比字段的基数,数据分布直方图等等)来对一些问题做出解答,其中最主要的问题就是肯定多表join的顺序。CBO经过搜索join顺序的全部解空间(表太多的状况下能够用有限深度的贪婪算法),而且算出对应的代价,能够找到最好的顺序。这些都已经在关系数据库中获得了实践。

目前Hive已经启动专门的项目,也就是Apache Optiq来作这个事情,而其余系统也没有作的很好的CBO,因此这块内容还有很大的进步空间。

执行效率

即便有了高效的执行计划,若是在运行过程自己效率较低,那么再好的执行计划也会大打折扣。这里主要关注CPU和IO方面的执行效率。

CPU

在具体的计算执行过程当中,低效的cpu会致使系统的瓶颈落在CPU上,致使IO没法充分利用。在一项针对Impala和Hive的对比时发现,Hive在某些简单查询上(TPC-H Query 1)也比Impala慢主要是由于Hive运行时彻底处于CPU bound的状态中,磁盘IO只有20%,而Impala的IO至少在85%。

在SQL on Hadoop中出现CPU bound的主要缘由有如下几种:

  • 大量虚函数调用:这个问题在多处出现,好比对于a + 2 * b之类的表达式计算,解释器会构造一个expression tree,解释的过程就是递归调用子节点作evaluation的过程。又好比以DAG形式的operator/task在执行的过程当中,上游节点会层层调用下游节点来获取产生的数据。这些都会产生大量的调用。
  • 类型装箱:因为表达式解释器须要对不一样数据类型的变量作解释,因此在Java中须要把这些原本是primitive的变量包装成Object,累积起来也消耗很多资源。这算是上面一个问题附带出来的。
  • branch instruction: 如今的CPU都是有并行流水线的,可是若是出现条件判断会致使没法并行。这种状况可能出如今判断数据的类型(是string仍是int),或者在判断某一列是否由于其余字段的过滤条件致使本行不须要被读取(列存储状况下)。
  • cache miss:每次处理一行数据的方式致使cpu cache命中率不高。(这么说已经暗示了解决方案)

针对上面的问题,目前大多数系统中已经加入了如下两个解决办法中至少一个。

一个方法是动态代码生成,也就是不使用解释性的统一代码。好比a + 2 * b这个表达式就会生成对应的执行语言的代码,并且能够直接用primitive type,而不是用固定的解释性代码。具体实现来讲,JVM系的如Spark SQL,Presto能够用反射,C++系的Impala则使用了llvm生成中间码。对于判断数据类型形成的分支判断,动态代码的效果能够消除这些类型判断,还能够展开循环,能够对比下面这段代码,左边是解释性代码,右边是动态生成代码。

这里写图片描述

另外一个方法是vectorization(向量化),基本思路是放弃每次处理一行的模式,改用每次处理一小批数据(好比1k行),固然前提条件是使用列存储格式。这样一来,这一小批连续的数据能够放进cache里面,cpu不只减小了branch instruction,甚至能够用SIMD加快处理速度。具体的实现参考下面的代码,对一个long型的字段增长一个常量。经过把数据表示成数组,过滤条件也用selVec装进数组,造成了很紧凑的循环:

add(int vecNum, long[] result, long[] col1, int[] col2, int[] selVec) 
{   
  if (selVec == null)   
     for (int i = 0; i < vecNum; i++) 
         result[i] = col1[i] + col2[i];
  else 
     for (int i = 0; i < vecNum; i++) 
     {
         int selIdx = selVec[i];
         result[selIdx] = col1[selIdx] + col2[selIdx];
     }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13

IO

因为SQL on Hadoop存储数据都是在HDFS上,因此IO层的优化其实大多数都是HDFS的事情,各大查询引擎则提出需求去进行推进。要作到高效IO,一方面要低延迟,屏蔽没必要要的消耗;另外一方面要高吞吐,充分利用每一块磁盘。目前与这方面有关的特性有:

  • short-circuit local reads:当发现读取的数据是本地数据时,不走DataNode(由于要走一次socket链接),而是用DFS Client直接读本地的block replica。HDFS参数是dfs.client.read.shortcircuit和dfs.domain.socket.path。
  • zero copy:避免数据在内核buffer和用户buffer之间反复copy,在早期的HDFS中已经有这个默认实现。
  • disk-aware scheduling:经过知道每一个block所在磁盘,能够在调度cpu资源时让不一样的cpu读不一样的磁盘,避免查询内和查询间的IO竞争。HDFS参数是dfs.datanode.hdfs-blocks-metadata.enabled。

存储格式

对于分析类型的workload来讲,最好的存储格式天然是列存储,这已经在关系数据库时代获得了证实。目前hadoop生态中有两大列存储格式,一个是由Hortonworks和Microsoft开发的ORCFile,另外一个是由Cloudera和Twitter开发的Parquet。

ORCFile顾名思义,是在RCFile的基础之上改造的。RCFile虽然号称列存储,可是只是“按列存储”而已,将数据先划分红row group,而后row group内部按照列进行存储。这其中没有列存储的一些关键特性,而这些特性在之前的列式数据库中(好比我之前用过的Infobright)早已用到。好在ORCFile已经弥补了这些特性,包括:

  • 块过滤与块统计:每一列按照固定行数或大小进一步切分,对于切分出来的每个数据单元,预先计算好这些单元的min/max/sum/count/null值,min/max用于在过滤数据的时候直接跳过数据单元,而全部这些统计值则能够在作聚合操做的时候直接采用,而没必要解开这个数据单元作进一步的计算。
  • 更高效的编码方式:RCFile中没有标注每一列的类型,事实上当知道数据类型时,能够采起特定的编码方式,自己就能很大程度上进行数据的压缩。常见的针对列存储的编码方式有RLE(大量重复数据),字典(字符串),位图(数字且基数不大),级差(排序过的数据,好比日志中用户访问时间)等等。

ORCFile的结构以下图,数据先按照默认256M分为row group,也叫strip。每一个strip配一个index,存放每一个数据单元(默认10000行)的min/max值用于过滤;数据按照上面提到的编码方式序列化成stream,而后再进行snappy或gz压缩。footer提供读取stream的位置信息,以及更多的统计值如sum/count等。尾部的file footer和post script提供全局信息,如每一个strip的行数,各列数据类型,压缩参数等。

这里写图片描述

Parquet的设计原理跟ORC相似,不过它有两个特色:

  • 通用性:相比ORCFile专门给Hive使用而言,Parquet不只仅是给Impala使用,还能够给其余查询工具使用,如Hive、Pig,进一步还能对接avro/thrift/pb等序列化格式。
  • 基于Dremel思想的嵌套格式存储:关系数据库设计模式中反对存储复杂格式(违反第一范式),可是如今的大数据计算不只出现了这种需求(半结构化数据),也可以高效的实现存储和查询效率,在语法上也有相应的支持(各类UDF,Hive的lateral view等)。Google Dremel就在实现层面作出了范例,Parquet则彻底仿照了Dremel。

对嵌套格式作列存储的难点在于,存储时须要标记某个数据对应于哪个存储结构,或者说是哪条记录,因此须要用数据清楚的进行标记。 在Dremel中提出用definition level和repetition level来进行标记。definition level指的是,这条记录在嵌套结构中所处于第几层,而repetition level指的是,这条记录相对上一条记录,在第几层重复。好比下图是一个二级嵌套数组。图中的e跟f在都属于第二层的重复记录(同一个level2),因此f的r值为2,而c跟d则是不一样的level2,但属于同一个level1,因此d的r值为1。对于顶层而言(新的一个嵌套结构),r值就为0。

这里写图片描述

可是仅仅这样还不够。上图说明了r值的做用,可是尚未说明d值的做用,由于按照字面解释,d值对于每个字段都是能够根据schema获得的,那为何还要从行记录级别标记?这是由于记录中会插入一些null值,这些null值表明着他们“能够存在”可是由于是repeated或者是optional因此没有值的状况,null值是用来占位的(或者说是“想象”出来的),因此他们的值须要单独计算。null的d值就是说这个结构往上追溯到哪一层(不包括平级)就不是null(不是想象)了。在dremel paper中有完整的例子,例子中country的第一个null在code = en所在的结构里面,那么language不是null(不考虑code,他跟country平级),他就是第二层;又好比country的第二个null在url = http://B 所在的结构里面,那么name不是null(不考虑url,由于他跟原本就是null的language平级),因此就是第一层。

这里写图片描述

这里写图片描述

经过这种方式,就对一个树状的嵌套格式完成了存储。在读取的时候能够经过构造一个状态机进行遍历。

有意思的是,虽然parquet支持嵌套格式,可是Impala尚未来得及像Hive那样增长array,map,struct等复杂格式,固然这项功能已经被列入roadmap了,相信不久就会出现。

在最近咱们作的Impala2.0测试中,顺便测试了存储格式的影响。parquet相比sequencefile在压缩比上达到1:5,查询性能也相差5-10倍,足见列存储一项就给查询引擎带来的提高。

资源控制

运行时资源调整

对于一个MR Job,reduce task的数量一直是须要人为估算的一个麻烦事,基于MR的Hive也只是根据数据源大小粗略的作估计,不考虑具体的Job逻辑。可是在以后的框架中考虑到了这个状况,增长了运行时调整资源分配的功能。Tez中引入了vertex manager,能够根据运行时收集到的数据智能的判断reduce动做须要的task。相似的功能在TAJO中也有提到,叫progressive query optimization,并且TAJO不只能作到动态调整task数量,还能动态调整join顺序。

资源集成

在Hadoop已经进入2.x的时代,全部想要获得普遍应用的SQL on Hadoop系统势必要能与YARN进行集成。虽然这是一个有利于资源合理利用的好事,可是因为加入了YARN这一层,却给系统的性能带来了必定的障碍,由于启动AppMaster和申请container也会占用很多时间,尤为是前者,并且container的供应若是时断时续,那么会极大的影响时效性。在Tez和Impala中对这些问题给出了相应的解决办法:

  • AppMaster启动延迟的问题,采起long lived app master,AppMaster启动后长期驻守,而非像是MR那样one AM per Job。具体实现时,能够给fair scheduler或capacity scheduler配置的每一个队列配上一个AM池,有必定量的AM为提交给这个队列的任务服务。
  • container供应的问题,在Tez中采起了container复用的方式,有点像jvm复用,即container用完之后不立刻释放,等一段时间,实在是没合适的task来接班了再释放,这样不只减小container断供的可能,并且能够把上一个task留下的结果cache住给下一个task复用,好比作map join;Impala则采起比较激进的方式,一次性等全部的container分配到位了才开始执行查询,这种方式也能让它的流水线式的计算不至于阻塞。

其余

到这里为止,已经从上到下顺了一遍各个层面用到的技术,固然SQL on Hadoop自己就至关复杂,涉及到方方面面,时间精力有限不可能一一去琢磨。好比其余一些具备技术复杂度的功能有:

  • 多数据源查询:Presto支持从mysql,cassandra,甚至kafka中去读取数据,这就大大减小了数据整合时间,不须要放到HDFS里才能查询。Impala和Hive也支持查询hbase。Spark SQL也在1.2版本开始支持External Datasource。国内也有相似的工做,如秒针改造Impala使之能查询postgres。
  • 近似查询:count distinct(基数估计)一直是sql性能杀手之一,若是能接受必定偏差的话能够采用近似算法。Impala中已经实现了近似算法(ndv),Presto则是请blinkDB合做完成。二者都是采用了HyperLogLog Counting。固然,不只仅是count distinct可使用近似算法,其余的如取中位数之类的也能够用。

结束语

尽管如今相关系统已经不少,也通过了几年的发展,可是目前各家系统仍然在不断的进行完善,好比:

  • 增长分析函数,复杂数据类型,SQL语法集的扩展。
  • 对于已经成形的技术也在不断的改进,如列存储还能够增长更多的encoding方式。
  • 甚至对于像CBO这样的领域,开源界拿出来的东西还算是刚刚起步,相比HAWQ中的ORCA这种商业系统提供的优化器还差的不少。

毕竟相比已经比较成熟的关系数据库,分布式环境下须要解决的问题更多,将来必定还会出现不少精彩的技术实践,让咱们在海量数据中更快更方便的查到想要的数据。

相关文章
相关标签/搜索