13-hive的执行过程

1. MR 转 hive 的过程–逻辑分析

1.1 Join 实现的基本原理

select u.name, o.orderid from order o join user u on o.uid = u.uid;

在map的输出value中为不一样表的数据打上tag标记,在reduce阶段根据tag判断数据来源。MapReduce的过程以下(这里只是说明最基本的Join的实现,还有其余的实现方式)web

image

1.2 group by 实现过程

select rank, isonline, count(*) from city group by rank, isonline;

将GroupBy的字段组合为map的输出key值,利用MapReduce的排序,在reduce阶段保存LastKey区分不一样的key。sql

在group by 中 reduce 对相同key 中的list数组加和(map中可使用combiner来去重)。express

image

1.3 Distinct的实现原理:

select dealid, count(distinct uid) num from order group by dealid;

当只有一个distinct字段时,若是不考虑Map阶段的Hash GroupBy,只须要将GroupBy字段和Distinct字段组合为map输出key,利用mapreduce的排序,同时将GroupBy字段做为reduce的key,在reduce阶段保存LastKey便可完成去重apache

与 单纯 group by 不一样这里到达reduce 的 list 数组只取一个值。数组

image

若是有多个distinct字段呢,以下面的SQL架构

select dealid, count(distinct uid), count(distinct date) from order group by dealid;

实现方式有两种:svg

(1)若是仍然按照上面一个distinct字段的方法,即下图这种实现方式,没法跟据uid和date分别排序,也就没法经过LastKey去重,仍然须要在reduce阶段在内存中经过Hash去重工具

image

(2)第二种实现方式,能够对全部的distinct字段编号,每行数据生成n行数据,那么相同字段就会分别排序,这时只须要在reduce阶段记录LastKey便可去重。oop

这种实现方式很好的利用了MapReduce的排序,节省了reduce阶段去重的内存消耗,可是缺点是增长了shuffle的数据量。学习

须要注意的是,在生成reduce value时,除第一个distinct字段所在行须要保留value值,其他distinct数据行value字段都可为空。

image

2. 转换过程–物理实现:

hive的总体架构图以下所示, compiler部分负责把HiveSQL转换成MapReduce任务。

image

  • 基本转换步骤

hiveSQL转换成MapReduce的执行计划包括以下几个步骤:

HiveSQL ->AST(抽象语法树) -> QB(查询块) ->OperatorTree(操做树)->优化后的操做树->mapreduce任务树->优化后的mapreduce任务树

image

  1. Antlr 定义SQL的语法规则,完成SQL词法,语法解析,将SQL转化为抽象语法树AST Tree
  2. 遍历AST Tree,抽象出查询的基本组成单元QueryBlock
  3. 遍历QueryBlock,翻译为执行操做树OperatorTree
  4. 逻辑层优化器进行OperatorTree变换,合并没必要要的ReduceSinkOperator,减小shuffle数据量
  5. 遍历OperatorTree,翻译为MapReduce任务
  6. 物理层优化器进行MapReduce任务的变换,生成最终的执行计划

2.1 Antlr

Hive使用Antlr实现SQL的词法和语法解析。Antlr是一种语言识别的工具,能够用来构造领域语言。
这里不详细介绍Antlr,只须要了解使用Antlr构造特定的语言只须要编写一个语法文件,定义词法和语法替换规则便可,Antlr完成了词法分析、语法分析、语义分析、中间代码生成的过程。

2.2 AST Tree

通过词法和语法解析后,若是须要对表达式作进一步的处理,使用 Antlr 的抽象语法树语法Abstract Syntax Tree,在语法分析的同时将输入语句转换成抽象语法树,后续在遍历语法树时完成进一步的处理。

2.3 QueryBlock

QueryBlock是一条SQL最基本的组成单元,包括三个部分:输入源,计算过程,输出。简单来说一个QueryBlock就是一个子查询。

2.4 Operator

Hive最终生成的MapReduce任务,Map阶段和Reduce阶段均由OperatorTree组成。逻辑操做符,就是在Map阶段或者Reduce阶段完成单一特定的操做。

基本的操做符包括TableScanOperator,SelectOperator,FilterOperator,JoinOperator,GroupByOperator,ReduceSinkOperator

因为Join/GroupBy/OrderBy均须要在Reduce阶段完成,因此在生成相应操做的Operator以前都会先生成一个ReduceSinkOperator,
将字段组合并序列化为Reduce Key/value, Partition Key

大部分逻辑层优化器经过变换OperatorTree,合并操做符,达到减小MapReduce Job,减小shuffle数据量的目的
OperatorTree生成MapReduce Job的过程
  1. 对输出表生成MoveTask
  2. 从OperatorTree的其中一个根节点向下深度优先遍历
  3. ReduceSinkOperator标示Map/Reduce的界限,多个Job间的界限
  4. 遍历其余根节点,遇过碰到JoinOperator合并MapReduceTask
  5. 生成StatTask更新元数据
  6. 剪断Map与Reduce间的Operator的关系

3. EXPLAIN 语句:

  • EXPLAIN 功能能够帮助咱们学习 Hive 是如何将查询转化成 MapReduce 任务的。
  • EXPLANIN 修饰的语句自己不会执行。
hive> explain select count(*) from message;
OK
STAGE DEPENDENCIES:
  Stage-1 is a root stage
  Stage-0 depends on stages: Stage-1

STAGE PLANS:
  Stage: Stage-1
    Map Reduce              
      Map Operator Tree:                        ## 发生在job 的map 处理阶段
          TableScan
            alias: message                      ## tableScan 以表 message 做为输入
            Statistics: Num rows: 1 Data size: 71 Basic stats: COMPLETE Column stats: COMPLETE
            Select Operator
              Statistics: Num rows: 1 Data size: 71 Basic stats: COMPLETE Column stats: COMPLETE
              Group By Operator
                aggregations: count()           ## group by operator 会应用到 count() 
                mode: hash
                outputColumnNames: _col0        ## 为临时结果字段按规则起的临时字段名。
                Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: COMPLETE
                Reduce Output Operator          ## reduce 过程
                  sort order: 
                  Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: COMPLETE
                  value expressions: _col0 (type: bigint)
      Reduce Operator Tree:
        Group By Operator
          aggregations: count(VALUE._col0)
          mode: mergepartial
          outputColumnNames: _col0
          Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: COMPLETE
          File Output Operator                  ## 在 reduce 中看到 File Output Operator, 说明输出结果将是文本格式。
            compressed: false
            Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: COMPLETE
            table:
                input format: org.apache.hadoop.mapred.SequenceFileInputFormat
                output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat   -- 说明输出结果将是文本格式, 基于字符串的输出格式。
                serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe

  Stage: Stage-0
    Fetch Operator
      limit: -1                             ## job 没有 limit , 所以是一个没有操做的阶段。
      Processor Tree:
        ListSink

Time taken: 0.06 seconds, Fetched: 42 row(s)

详情扩展 EXPLAIN EXTENDS

hive> explain extended select count(*) from message;
OK
STAGE DEPENDENCIES:
  Stage-1 is a root stage
  Stage-0 depends on stages: Stage-1


STAGE PLANS:
  Stage: Stage-1
    Map Reduce
      Map Operator Tree:
          TableScan
            alias: message
            Statistics: Num rows: 1 Data size: 71 Basic stats: COMPLETE Column stats: COMPLETE
            GatherStats: false
            Select Operator
              Statistics: Num rows: 1 Data size: 71 Basic stats: COMPLETE Column stats: COMPLETE
              Group By Operator
                aggregations: count()
                mode: hash
                outputColumnNames: _col0
                Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: COMPLETE
                Reduce Output Operator
                  null sort order: 
                  sort order: 
                  Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: COMPLETE
                  tag: -1
                  value expressions: _col0 (type: bigint)
                  auto parallelism: false
      Path -> Alias:
        hdfs://localhost:9000/user/hive/derby/warehouse/saligia.db/message [message]
      Path -> Partition:
        hdfs://localhost:9000/user/hive/derby/warehouse/saligia.db/message 
          Partition
            base file name: message
            input format: org.apache.hadoop.mapred.TextInputFormat
            output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
            properties:
              bucket_count -1
              colelction.delim :
              columns id,one
              columns.comments 
              columns.types tinyint:struct<name:string,age:int,grade:int>
              field.delim  
              file.inputformat org.apache.hadoop.mapred.TextInputFormat
              file.outputformat org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
              location hdfs://localhost:9000/user/hive/derby/warehouse/saligia.db/message
              name saligia.message
              numFiles 1
              numRows 0
              rawDataSize 0
              serialization.ddl struct message { byte id, struct<name:string,age:i32,grade:i32> one}
              serialization.format  
              serialization.lib org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
              totalSize 71
              transient_lastDdlTime 1477793417
            serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe

              input format: org.apache.hadoop.mapred.TextInputFormat
              output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
              properties:
                bucket_count -1
                colelction.delim :
                columns id,one
                columns.comments 
                columns.types tinyint:struct<name:string,age:int,grade:int>
                field.delim  
                file.inputformat org.apache.hadoop.mapred.TextInputFormat
                file.outputformat org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
                location hdfs://localhost:9000/user/hive/derby/warehouse/saligia.db/message
                name saligia.message
                numFiles 1
                numRows 0
                rawDataSize 0
                serialization.ddl struct message { byte id, struct<name:string,age:i32,grade:i32> one}
                serialization.format  
                serialization.lib org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
                totalSize 71
                transient_lastDdlTime 1477793417
              serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
              name: saligia.message
            name: saligia.message
      Truncated Path -> Alias:
        /saligia.db/message [message]
      Needs Tagging: false
      Reduce Operator Tree:
        Group By Operator
          aggregations: count(VALUE._col0)
          mode: mergepartial
          outputColumnNames: _col0
          Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: COMPLETE
          File Output Operator
            compressed: false
            GlobalTableId: 0
            directory: hdfs://localhost:9000/tmp/hive/EVA/e8318af4-0af7-4629-832f-8711465a42e2/hive_2016-10-30_15-06-46_290_6355957965445092450-1/-mr-10001/.hive-staging_hive_2016-10-30_15-06-46_290_6355957965445092450-1/-ext-10002
            NumFilesPerFileSink: 1
            Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: COMPLETE
            Stats Publishing Key Prefix: hdfs://localhost:9000/tmp/hive/EVA/e8318af4-0af7-4629-832f-8711465a42e2/hive_2016-10-30_15-06-46_290_6355957965445092450-1/-mr-10001/.hive-staging_hive_2016-10-30_15-06-46_290_6355957965445092450-1/-ext-10002/
            table:
                input format: org.apache.hadoop.mapred.SequenceFileInputFormat
                output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
                properties:
                  columns _col0
                  columns.types bigint
                  escape.delim \
                  hive.serialization.extend.additional.nesting.levels true
                  serialization.escape.crlf true
                  serialization.format 1
                  serialization.lib org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
                serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
            TotalFiles: 1
            GatherStats: false
            MultiFileSpray: false

  Stage: Stage-0
    Fetch Operator
      limit: -1
      Processor Tree:
        ListSink

Time taken: 1.25 seconds, Fetched: 117 row(s)