Apache Calcite 是一款开源SQL解析工具, 能够将各类SQL语句解析成抽象语法术AST(Abstract Syntax Tree), 以后经过操做AST就能够把SQL中所要表达的算法与关系体如今具体代码之中。前端
Calcite的生前为Optiq(也为Farrago), 为Java语言编写, 经过十多年的发展, 在2013年成为Apache旗下顶级项目,并还在持续发展中, 该项目的创始人为Julian Hyde, 其拥有多年的SQL引擎开发经验, 目前在Hortonworks工做, 主要负责Calcite项目的开发与维护。java
目前, 使用Calcite做为SQL解析与处理引擎有Hive
、Drill、Flink、Phoenix、Phoenix和Storm,能够确定的是还会有愈来愈多的数据处理引擎采用Caclite做为SQL解析工具。node
总结来讲Calcite有如下主要功能:git
如上图中所述,通常来讲Calcite解析SQL有如下几步:github
Calcite主要有如下概念:算法
Catalog:主要定义被SQL访问的命名空间,主要包括如下几点:sql
RelDataType
定义RelDataType
表明表的数据定义,如表的数据列名称、类型等。Schema:数据库
public interface Schema { Table getTable(String name); Set<String> getTableNames(); Set<String> getFunctionNames(); Schema getSubSchema(String name); Set<String> getSubSchemaNames(); Expression getExpression(SchemaPlus parentSchema, String name); boolean isMutable();
Table:apache
public interface Table { RelDataType getRowType(RelDataTypeFactory typeFactory); Statistic getStatistic(); Schema.TableType getJdbcTableType(); }
其中RelDataType表明Row的数据类型, Statistic 用于统计表的相关数据、特别是在CBO用于计表计算表的代价。编程
一句Sql
selcct id, name, cast(age as bigint) from A.INFO
id, name
则为data type fieldbigint
为 data typeA
为schemaINFO
为表由Java CC编写,将SQL转化成AST.
SqlNode
, 而且Sqlnode
能够经过unparse
方法反向转化成SQLcast(id as float)
Java CC 可表示为
<CAST> <LPAREN> e = Expression(ExprContext.ACCEPT_SUBQUERY) <AS> dt = DataType() {agrs.add(dt);} <RPAREN> ....
首先看一下
INSERT INTO tmp_node SELECT s1.id1, s1.id2, s2.val1 FROM source1 as s1 INNER JOIN source2 AS s2 ON s1.id1 = s2.id1 and s1.id2 = s2.id2 where s1.val1 > 5 and s2.val2 = 3;
经过Calcite转化为:
LogicalTableModify(table=[[TMP_NODE]], operation=[INSERT], flattened=[false]) LogicalProject(ID1=[$0], ID2=[$1], VAL1=[$7]) LogicalFilter(condition=[AND(>($2, 5), =($8, 3))]) LogicalJoin(condition=[AND(=($0, $5), =($1, $6))], joinType=[INNER]) LogicalTableScan(table=[[SOURCE1]]) LogicalTableScan(table=[[SOURCE2]])
是未经优化的RelNode树,能够发现最底层是TableScan,也是读取表的原始数据,紧接着是LogicalJoin,Joiner的类型为INNER JOIN, LogicalJoin以后接下作LogicalFilter 操做,对应SQL中的WHERE条件,最后作Project也就是投影操做。
可是咱们能够观察到对于INNER JOIN而言, WHERE 条件是能够下推,如
LogicalTableModify(table=[[TMP_NODE]], operation=[INSERT], flattened=[false]) LogicalProject(ID1=[$0], ID2=[$1], VAL1=[$7]) LogicalJoin(condition=[AND(=($0, $5), =($1, $6))], joinType=[inner]) LogicalFilter(condition=[=($4, 3)]) LogicalProject(ID1=[$0], ID2=[$1], ID3=[$2], VAL1=[$3], VAL2=[$4],VAL3=[$5]) LogicalTableScan(table=[[SOURCE1]]) LogicalFilter(condition=[>($3,5)]) LogicalProject(ID1=[$0], ID2=[$1], ID3=[$2], VAL1=[$3], VAL2=[$4],VAL3=[$5]) LogicalTableScan(table=[[SOURCE2]])
这样能够减小JOIN的数据量,提升SQL效率
实际过程当中能够将JOIN 的中条件下推以较少Join的数据量
INSERT INTO tmp_node SELECT s1.id1, s1.id2, s2.val1 FROM source1 as s1 LEFT JOIN source2 AS s2 ON s1.id1 = s2.id1 and s1.id2 = s2.id2 and s1.id3 = 5
s1.id3 = 5
这个条件能够先下推过滤s1中的数据, 但在特定场景下,有些不能下推,以下sql:
INSERT INTO tmp_node SELECT s1.id1, s1.id2, s2.val1 FROM source1 as s1 LEFT JOIN source2 AS s2 ON s1.id1 = s2.id1 and s1.id2 = s2.id2 and s2.id3 = 5
若是s1,s2是流式表(动态表,请参考Flink流式概念)的话,就不能下推,由于s1下推的话,因为过滤后没有数据驱动join操做,于是得不到想要的结果(详见Flink/Sparking-Streaming)
那接下来咱们可能有一个疑问,在什么状况下能够作相似下推、上推操做,又是根据什么原则进行的呢?以下图所示
T1 JOIN T2 JOIN T3
相似于此种状况JOIN的顺序是上图的前者仍是后者?这就涉及到Optimizer所使用的方法,Optimizer主要目的就是减少SQL所处理的数据量、减小所消耗的资源并最大程度提升SQL执行效率如:剪掉无用的列、合并投影、子查询转化成JOIN、JOIN重排序、下推投影、下推过滤等。目前主要有两类优化方法:基于语法(RBO)与基于代价(CBO)的优化
通俗一点的话就是事先定义一系列的规则,而后根据这些规则来优化执行计划。
如
ProjectFilterRule
此Rule的使用场景为Filter在Project之上,能够将Filter下推。假如某一个RelNode树
LogicalFilter LogicalProject LogicalTableScan
则可优化成
LogicalProject LogicalFilter LogicalTableScan
FilterJoinRule
此Rule的使用场景为Filter在Join之上,能够先作Filter而后再作Join, 以减小Join的数量
等等,还有不少相似的规则。但RBO必定程度上是经验试的优化方法,没法有一个公式上的判断哪一种优化更优。 在Calcite中实现方法为 HepPlanner
通俗一点的说法是:经过某种算法计算SQL全部可能的执行计划的“代价”,选择某一个代价较低的执行计划,如上文中三张表做JOIN, 通常来讲RBO没法判断哪一种执行计划优化更好,只有分别计算每一种JOIN方法的代价。
Calcite会将每一种操做(如LogicaJoin、LocialFilter、 LogicalProject、LogicalScan) 结合实际的Schema转化成具体的代价数,比较不一样的执行计划所具备的代价,而后选择相对小计划做为最终的结果,之因此说相对小,这是由于若是要彻底遍历计算全部可能的代价可能得不偿失,花费更多的人力与资源,所以只是说选择相对最优的执行计划。CBO目的是“避免使用最差的执行计划,而不是找到最好的”
目前Calcite中就是采用CBO进行优化,实现方法为VolcanoPlanner
,有关此算法的具体内容能够参考原码
因为Calcite是Java语言编写,所以只须要在工程或项目中引入相应的Jar包便可,下面为一个能够运行的例子:
public class TestOne { public static class TestSchema { public final Triple[] rdf = {new Triple("s", "p", "o")}; } public static void main(String[] args) { SchemaPlus schemaPlus = Frameworks.createRootSchema(true); //给schema T中添加表 schemaPlus.add("T", new ReflectiveSchema(new TestSchema())); Frameworks.ConfigBuilder configBuilder = Frameworks.newConfigBuilder(); //设置默认schema configBuilder.defaultSchema(schemaPlus); FrameworkConfig frameworkConfig = configBuilder.build(); SqlParser.ConfigBuilder paresrConfig = SqlParser.configBuilder(frameworkConfig.getParserConfig()); //SQL 大小写不敏感 paresrConfig.setCaseSensitive(false).setConfig(paresrConfig.build()); Planner planner = Frameworks.getPlanner(frameworkConfig); SqlNode sqlNode; RelRoot relRoot = null; try { //parser阶段 sqlNode = planner.parse("select \"a\".\"s\", count(\"a\".\"s\") from \"T\".\"rdf\" \"a\" group by \"a\".\"s\""); //validate阶段 planner.validate(sqlNode); //获取RelNode树的根 relRoot = planner.rel(sqlNode); } catch (Exception e) { e.printStackTrace(); } RelNode relNode = relRoot.project(); System.out.print(RelOptUtil.toString(relNode)); } }
类Triple 对应的表定义:
public class Triple { public String s; public String p; public String o; public Triple(String s, String p, String o) { super(); this.s = s; this.p = p; this.o = o; } }
详细能够代码在这里
Calcite的功能远不止以上介绍,除了标准SQL的,还支持如下内容:
以上内容主要介绍上Calcite相关概念并经过相例子说明了Calcite使用方法, 但愿经过上述内容,读者能对Calcite有初步的了解。
因为笔者使用和探索Calcite时间也不长,以上内容不免有错误与不许确之处,还望各位读者不吝指正,相互学习。
参考文献与网址: