UserServer处理RUN_QUERY_VALUE客户端的查询请求,会将任务分派给UserWorker处理, 由worker提交工做:
显然worker要在构造UserServer的时候也一块儿构造出来, 这样在收到任务的时候, 确保当即有工人接手这份工做.
UserServer的构造在ServiceEngine,而服务引擎是由DrillBit建立的.
UserWorker是由WorkerManager管理的, 而WorkerManager也是由DrillBit建立的.
因此启动DrillBit服务后,参与计算的角色都已经准备好了.java
Role | Explain |
---|---|
WorkerBee | 工蜂, 真正干活的 |
UserWorker | 用户操做的(工人), 经过WorkerBee构成 |
WorkerManager | 工人管理员,负责选择一个工人来工做 |
UserServer | 用户操做的服务端,会将工做交给UserWorker,它须要一个UserWorker |
Foreman | 包工头,监工.由UserWorker建立出来. 由于UserWorker底层是WorkerBee,因此会将WorkerBee和Foreman关联起来 |
ServiceEngine | 服务引擎,管理UserServer,Controller |
DrillBit | Drill的服务端控制进程,管理ServiceEngine,WorkerManager |
BootStrapContext | 启动DrillBit的上下文,包括配置信息,度量注册 |
DrillbitContext | DrillBit工做时候的上下文 |
Controller | 不一样DrillBit节点的通讯 |
ControllServer | 不一样节点间消息传输,链接等的RPC服务端 |
DataServer | 负责数据交互的RPC服务端 |
首先看下UserWorker是怎么提交一个任务的:node
public class UserWorker{ private final WorkerBee bee; public QueryId submitWork(UserClientConnection connection, RunQuery query) { ThreadLocalRandom r = ThreadLocalRandom.current(); // create a new queryid where the first four bytes are a growing time (each new value comes earlier in sequence). Last 12 bytes are random. long time = (int) (System.currentTimeMillis()/1000); long p1 = ((Integer.MAX_VALUE - time) << 32) + r.nextInt(); long p2 = r.nextLong(); QueryId id = QueryId.newBuilder().setPart1(p1).setPart2(p2).build(); incrementer.increment(connection.getSession()); Foreman foreman = new Foreman(bee, bee.getContext(), connection, id, query); bee.addNewForeman(foreman); return id; }
返回的QueryId会由UserServer经过RPC发送给客户端, 表示客户端这一次的查询标识. 服务端已经接受了此次查询.
可是服务端尚未开始执行这个查询任务, 后续若是客户端须要查询结果, 能够凭这个QueryId, 就能够向服务端要数据结果.sql
WorkerBee从名字上看是工做的蜜蜂, 工蜂一直默默无闻地工做. 它为母蜂Foreman服务.
如今咱们由UserWorker建立了一个Foreman. 工蜂把它加进来.express
问题:
1.为何不是由Foreman管理WorkerBee,而是让WorkerBee(工蜂)主动把Foreman(监工)加进来?
2.为何Foreman做为一个进程,不是本身启动,而是要由工人来启动?apache
Foreman负责管理一次查询的全部fragments, Foreman会做为根节点/驱动节点编程
/** * Foreman manages all the fragments (local and remote) for a single query where this is the driving/root node. * The flow is as follows: * - Foreman is submitted as a runnable. 被提交为可执行的 * - Runnable does query planning. 作什么: 查询计划 * - state changes from PENDING to RUNNING 状态改变 * - Runnable sends out starting fragments 发射起始fragments * - Status listener are activated 监听器被激活 * - The Runnable's run() completes, but the Foreman stays around 线程的run方法结束,而Foreman还停留...作什么, 看下面的 * - Foreman listens for state change messages. 监听状态改变的消息 * - state change messages can drive the state to FAILED or CANCELED, in which case 状态消息会驱动/更新Foreman的状态 * messages are sent to running fragments to terminate 消息会使得正在运行的fragments终结 * - when all fragments complete, state change messages drive the state to COMPLETED 当全部的fragments完成后, 状态改变的消息更新Formeman的状态为已完成 */ public class Foreman implements Runnable { private final QueryId queryId; //the id for the query private final RunQuery queryRequest; //the query to execute private final QueryContext queryContext; private final QueryManager queryManager; // handles lower-level details of query execution private final WorkerBee bee; // provides an interface to submit tasks, used to submit additional work private final DrillbitContext drillbitContext; private final UserClientConnection initiatingClient; // used to send responses // Sets up the Foreman, but does not initiate any execution. 设置Foreman, 可是并无初始化任何的执行 public Foreman(final WorkerBee bee, final DrillbitContext drillbitContext, final UserClientConnection connection, final QueryId queryId, final RunQuery queryRequest) { this.bee = bee; this.queryId = queryId; this.queryRequest = queryRequest; this.drillbitContext = drillbitContext; this.initiatingClient = connection; this.closeFuture = initiatingClient.getChannel().closeFuture(); closeFuture.addListener(closeListener); queryContext = new QueryContext(connection.getSession(), drillbitContext); queryManager = new QueryManager(queryId, queryRequest, drillbitContext.getPersistentStoreProvider(), stateListener, this); recordNewState(QueryState.PENDING); }
Foreman的run方法根据RunQuery的类型执行不一样的方法,好比SQL类型,则要负责将SQL语句经过Calcite解析成逻辑计划,生成物理计划,最后运行物理计划.json
private void runSQL(final String sql) throws ExecutionSetupException { final DrillSqlWorker sqlWorker = new DrillSqlWorker(queryContext); final Pointer<String> textPlan = new Pointer<>(); final PhysicalPlan plan = sqlWorker.getPlan(sql, textPlan); queryManager.setPlanText(textPlan.value); runPhysicalPlan(plan); }
Calcite的planner对SQL进行parse解析, 生成SqlNode节点, 对于不一样的SqlNode类型, 由不一样的Handler进行进行解析.数组
public class DrillSqlWorker { private final Planner planner; //这两个Planner都是Calcite的,负责解析成逻辑计划 private final HepPlanner hepPlanner; private final QueryContext context; public PhysicalPlan getPlan(String sql, Pointer<String> textPlan) throws ForemanSetupException { SqlNode sqlNode = planner.parse(sql); //将SQL语句解析成SqlNode解析树① AbstractSqlHandler handler; SqlHandlerConfig config = new SqlHandlerConfig(hepPlanner, planner, context); switch(sqlNode.getKind()){ case EXPLAIN: handler = new ExplainHandler(config); break; case SET_OPTION: handler = new SetOptionHandler(context); break; case OTHER: if(sqlNode instanceof SqlCreateTable) { handler = ((DrillSqlCall)sqlNode).getSqlHandler(config, textPlan); break; } if (sqlNode instanceof DrillSqlCall) { handler = ((DrillSqlCall)sqlNode).getSqlHandler(config); break; } default: handler = new DefaultSqlHandler(config, textPlan); } return handler.getPlan(sqlNode); }
The Drillbit that receives the query from a client or application becomes the Foreman for the query and drives the entire query.
A parser in the Foreman parses the SQL[①], applying custom rules[②] to convert specific SQL operators into a specific logical operator syntax that Drill understands.
This collection of logical operators forms a logical plan. The logical plan describes the work required to generate the query results and defines what data sources and operations to apply.promiseForeman中的parser解析SQL, 并运用定制的规则, 将SQL操做符(Calcite的节点)转换成Drill认识的逻辑操做符(Drill的节点DrillRel).
转换后的逻辑操做符集合会组成一个逻辑计划. 注意上面的sqlNode=planner.parse(sql)对应的是SQL操做符, 转换成DrillRelNode在Handler的getPlan中完成.app
Calcite的编程API主要包括了: Operator, Rule, RelationExpression, SqlNode.
Calcite的planner对SQL进行parse解析, 除了用到Calcite自身的一些规则外, Drill也会附加一些规则getRules给它. 定义在DrillSqlWorker的构造函数中.
规则包括物理计划, 逻辑计划, 转换规则. 其中逻辑计划包括基本规则,用户自定义规则. 物理计划包括物理规则,存储插件的规则. 好比hive插件有本身的SQL执行转换规则.
public DrillSqlWorker(QueryContext context) { FrameworkConfig config = Frameworks.newConfigBuilder() ... .ruleSets(getRules(context))... //Drill附加的规则② .build(); this.planner = Frameworks.getPlanner(config); } private RuleSet[] getRules(QueryContext context) { StoragePluginRegistry storagePluginRegistry = context.getStorage(); RuleSet drillLogicalRules = DrillRuleSets.mergedRuleSets(DrillRuleSets.getDrillBasicRules(context), DrillRuleSets.getJoinPermRules(context), DrillRuleSets.getDrillUserConfigurableLogicalRules(context)); RuleSet drillPhysicalMem = DrillRuleSets.mergedRuleSets(DrillRuleSets.getPhysicalRules(context), storagePluginRegistry.getStoragePluginRuleSet()); // Following is used in LOPT join OPT. RuleSet logicalConvertRules = DrillRuleSets.mergedRuleSets(DrillRuleSets.getDrillBasicRules(context), DrillRuleSets.getDrillUserConfigurableLogicalRules(context)); RuleSet[] allRules = new RuleSet[] {drillLogicalRules, drillPhysicalMem, logicalConvertRules}; return allRules; }
逻辑计划的基本规则, 这些规则是通用的, 不须要在物理计划阶段完成, 通用的规则尽早作.
// Get an immutable list of rules that will always be used when running logical planning. public static RuleSet getDrillBasicRules(QueryContext context) { DRILL_BASIC_RULES = new DrillRuleSet(ImmutableSet.<RelOptRule> builder().add( // // Add support for Distinct Union (by using Union-All followed by Distinct) UnionToDistinctRule.INSTANCE, // Add support for WHERE style joins. 添加支持where类型的join DrillFilterJoinRules.DRILL_FILTER_ON_JOIN, DrillFilterJoinRules.DRILL_JOIN,
举个where类型的join规则转换: http://blog.aliyun.com/733
SELECT * FROM A JOIN B ON A.ID=B.ID WHERE A.AGE>10 AND B.AGE>5Predict Push Down
: 在遇有JOIN运算时,用户颇有可能还要在JOIN以后作WHERE运算,此时就要从代数逻辑上分析,
WHERE中计算的条件是否能够被提早到JOIN以前运算,以此来减小JOIN运算的数据量,提高效率
那么Drill的FilterJoin规则是怎么样的呢?
public class DrillFilterJoinRules { /** Predicate that always returns true for any filter in OUTER join, and only true for EQUAL or IS_DISTINCT_FROM over RexInputRef in INNER join. * With this predicate, the filter expression that return true will be kept in the JOIN OP. * Example: INNER JOIN, L.C1 = R.C2 and L.C3 + 100 = R.C4 + 100 will be kepted in JOIN. * L.C5 < R.C6 will be pulled up into Filter above JOIN. * OUTER JOIN, Keep any filter in JOIN. */ public static final FilterJoinRule.Predicate EQUAL_IS_DISTINCT_FROM = new FilterJoinRule.Predicate() { public boolean apply(Join join, JoinRelType joinType, RexNode exp) { // In OUTER join, we could not pull-up the filter. All we can do is keep the filter with JOIN, and then decide whether the filter could be pushed down into LEFT/RIGHT. if (joinType != JoinRelType.INNER) return true; List<RexNode> tmpLeftKeys = Lists.newArrayList(); List<RexNode> tmpRightKeys = Lists.newArrayList(); List<RelDataTypeField> sysFields = Lists.newArrayList(); RexNode remaining = RelOptUtil.splitJoinCondition(sysFields, join.getLeft(), join.getRight(), exp, tmpLeftKeys, tmpRightKeys, null, null); if (remaining.isAlwaysTrue()) return true; return false; } }; /** Rule that pushes predicates from a Filter into the Join below them. */ public static final FilterJoinRule DRILL_FILTER_ON_JOIN = new FilterJoinRule.FilterIntoJoinRule(true, RelFactories.DEFAULT_FILTER_FACTORY, RelFactories.DEFAULT_PROJECT_FACTORY, EQUAL_IS_DISTINCT_FROM);
这里最好要理解下Calcite的一些概念, 要否则理解起来有必定困难.
参考http://blog.csdn.net/yunlong34574/article/details/46375733了解下optiq-javaben这个项目的源码.
而后参考这里了解下查询下推优化:https://datapsyche.wordpress.com/2014/08/06/optiq-query-push-down-conc...
下面引用了Optiq做者的Apache Calcite Overview的一个示例:
两张表进行join后有一个where过滤条件, 没有使用规则的话, 则要join完后才进行过滤:
使用FilterJoinRule后, 把Filter提早到Join以前, 扫描以后马上进行, 这样减小了join的数据量:
那么怎么定义一个规则呢? cal.rels是一个RelationExpression数组, 调用onMatch时, rels=[Join,Filter,Scan]
所以咱们要得到call.rels中的Join和Filter. 使用数组索引rel(0)表示Join, rel(1)表示Filter.
最后调用call.transform(newJoin)将原始的RelationExpression转换成新的RelExp.
执行下面的SQL语句, 第一次不加where,第二次添加where过滤条件, 第三次where是字段比较
select * FROM dfs.`/home/hadoop/soft/apache-drill-1.0.0/sample-data/nation.parquet` nations join dfs.`/home/hadoop/soft/apache-drill-1.0.0/sample-data/region.parquet` regions on nations.N_REGIONKEY = regions.R_REGIONKEY select * FROM dfs.`/home/hadoop/soft/apache-drill-1.0.0/sample-data/nation.parquet` nations join dfs.`/home/hadoop/soft/apache-drill-1.0.0/sample-data/region.parquet` regions on nations.N_REGIONKEY = regions.R_REGIONKEY where nations.N_NATIONKEY>10 and regions.R_NAME='AMERICA' select * FROM dfs.`/home/hadoop/soft/apache-drill-1.0.0/sample-data/nation.parquet` nations join dfs.`/home/hadoop/soft/apache-drill-1.0.0/sample-data/region.parquet` regions on nations.N_REGIONKEY = regions.R_REGIONKEY where nations.N_NAME<regions.R_NAME
下面是对应物理计划可视化图, 图1在Scan和JOIN之间有Project:
图2虽然where过滤在join以后, 可是通过优化后, 会先于join执行的: 即filter以后才进行join
图3就没这么幸运了,要在join以后才能filter.
还有不少规则, 都在DrillRuleSets中.
getPlan的参数SqlNode在前面经过Calcite的解析, 结果是一颗SQL parse tree(不要觉得Node就只有一个节点),
但它还只是Calcite认识的SQL操做符, 咱们要将它转换为Drill可以认识的逻辑操做符即DrillRel.
public PhysicalPlan getPlan(SqlNode sqlNode) throws ValidationException, RelConversionException, IOException, ForemanSetupException { SqlNode rewrittenSqlNode = rewrite(sqlNode); TypedSqlNode validatedTypedSqlNode = validateNode(rewrittenSqlNode); SqlNode validated = validatedTypedSqlNode.getSqlNode(); RelDataType validatedRowType = validatedTypedSqlNode.getType(); RelNode rel = convertToRel(validated); rel = preprocessNode(rel); log("Optiq Logical", rel); DrillRel drel = convertToDrel(rel, validatedRowType); log("Drill Logical", drel); Prel prel = convertToPrel(drel); log("Drill Physical", prel); PhysicalOperator pop = convertToPop(prel); PhysicalPlan plan = convertToPlan(pop); log("Drill Plan", plan); return plan; }
Relational Expression(Rel)
在查询过程当中也说了: 执行计划老是包含一个Screen Operator,用来阻塞而且等待返回的数据. 返回的DrillRel就是逻辑计划.
SqlNode,RelNode是Calcite的节点, DrillRel是Drill的关系表达式节点,在最外层包装了一个Screen用于屏幕输出.
protected DrillRel convertToDrel(RelNode relNode, RelDataType validatedRowType) { // Put a non-trivial topProject to ensure the final output field name is preserved, when necessary. DrillRel topPreservedNameProj = addRenamedProject((DrillRel) convertedRelNode, validatedRowType); return new DrillScreenRel(topPreservedNameProj.getCluster(), topPreservedNameProj.getTraitSet(), topPreservedNameProj); }
Screen Node和其余一些DrillRel的构造函数, 其中input指定了Screen的输入,表示用Screen节点包装上原先的节点, 使其成为一个新的节点.
public class DrillScreenRel extends DrillScreenRelBase implements DrillRel { public DrillScreenRel(RelOptCluster cluster, RelTraitSet traitSet, RelNode input) { super(DRILL_LOGICAL, cluster, traitSet, input); } public LogicalOperator implement(DrillImplementor implementor) { LogicalOperator childOp = implementor.visitChild(this, 0, getInput()); return Store.builder().setInput(childOp).storageEngine("--SCREEN--").build(); }
类继承关系: DrillScreenRel >> DrillRel >> DrillRelNode >> RelNode
其中DrillRel是逻辑计划的关系表达式. 子类要实现implement方法, 返回逻辑操做符.
// Relational expression that is implemented in Drill. public interface DrillRel extends DrillRelNode { // Calling convention for relational expressions that are "implemented" by generating Drill logical plans public static final Convention DRILL_LOGICAL = new Convention.Impl("LOGICAL", DrillRel.class); LogicalOperator implement(DrillImplementor implementor); }
DrillImplementor: Context for converting a tree of DrillRel nodes into a Drill logical plan
而后将逻辑计划转换为物理计划, 将DrillRel转换为Prel. 最后才是Drill的Plan. 注意Drill的物理计划和最终的Plan是有点差异的.
protected Prel convertToPrel(RelNode drel) { Prel phyRelNode = (Prel) planner.transform(DrillSqlWorker.PHYSICAL_MEM_RULES, traits, drel); /* The order of the following transformation is important */ /* * 0.) For select * from join query, we need insert project on top of scan and a top project just * under screen operator. The project on top of scan will rename from * to T1*, while the top project * will rename T1* to *, before it output the final result. Only the top project will allow * duplicate columns, since user could "explicitly" ask for duplicate columns ( select *, col, *). * The rest of projects will remove the duplicate column when we generate POP in json format. */ phyRelNode = StarColumnConverter.insertRenameProject(phyRelNode); //* is star, and this column should convert }
转换的过程比较复杂, 并且转换的顺序也很重要. 先看第一个, 在select * from join这种状况下, 要插入两个Project.
一个是scan(bottom)之上, 一个是screen(top)之下. 好比下面的SQL语句:
select * from dfs.`/usr/install/apache-drill-1.1.0/sample-data/nation.parquet` nations join dfs.`/usr/install/apache-drill-1.1.0/sample-data/region.parquet` regions on nations.N_REGIONKEY = regions.R_REGIONKEY; +--------------+-----------------+--------------+-----------------------+--------------+--------------+-----------------------+ | N_NATIONKEY | N_NAME | N_REGIONKEY | N_COMMENT | R_REGIONKEY | R_NAME | R_COMMENT | +--------------+-----------------+--------------+-----------------------+--------------+--------------+-----------------------+ | 0 | ALGERIA | 0 | haggle. carefully f | 0 | AFRICA | lar deposits. blithe | | 1 | ARGENTINA | 1 | al foxes promise sly | 1 | AMERICA | hs use ironic, even |
物理计划:
00-00 Screen : rowType = RecordType(ANY *, ANY *0): rowcount = 25.0, cumulative cost = {62.5 rows, 402.5 cpu, 0.0 io, 0.0 network, 88.0 memory}, id = 2432 00-01 ⑤ ProjectAllowDup(*=[$0], *0=[$1]) : rowType = RecordType(ANY *, ANY *0): rowcount = 25.0, cumulative cost = {60.0 rows, 400.0 cpu, 0.0 io, 0.0 network, 88.0 memory}, id = 2431 00-02 ④ Project(T0¦¦*=[$0], T1¦¦*=[$2]) : rowType = RecordType(ANY T0¦¦*, ANY T1¦¦*): rowcount = 25.0, cumulative cost = {60.0 rows, 400.0 cpu, 0.0 io, 0.0 network, 88.0 memory}, id = 2430 00-03 ③ HashJoin(condition=[=($1, $3)], joinType=[inner]) : rowType = RecordType(ANY T0¦¦*, ANY N_REGIONKEY, ANY T1¦¦*, ANY R_REGIONKEY): rowcount = 25.0, cumulative cost = {60.0 rows, 400.0 cpu, 0.0 io, 0.0 network, 88.0 memory}, id = 2429 00-05 ① Project(T0¦¦*=[$0], N_REGIONKEY=[$1]) : rowType = RecordType(ANY T0¦¦*, ANY N_REGIONKEY): rowcount = 25.0, cumulative cost = {25.0 rows, 50.0 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 2426 00-07 Scan(groupscan=[ParquetGroupScan [entries=[ReadEntryWithPath [path=file:/usr/install/apache-drill-1.1.0/sample-data/nation.parquet]], selectionRoot=file:/usr/install/apache-drill-1.1.0/sample-data/nation.parquet, numFiles=1, columns=[`*`]]]) : rowType = (DrillRecordRow[*, N_REGIONKEY]): rowcount = 25.0, cumulative cost = {25.0 rows, 50.0 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 2425 00-04 ② Project(T1¦¦*=[$0], R_REGIONKEY=[$1]) : rowType = RecordType(ANY T1¦¦*, ANY R_REGIONKEY): rowcount = 5.0, cumulative cost = {5.0 rows, 10.0 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 2428 00-06 Scan(groupscan=[ParquetGroupScan [entries=[ReadEntryWithPath [path=file:/usr/install/apache-drill-1.1.0/sample-data/region.parquet]], selectionRoot=file:/usr/install/apache-drill-1.1.0/sample-data/region.parquet, numFiles=1, columns=[`*`]]]) : rowType = (DrillRecordRow[*, R_REGIONKEY]): rowcount = 5.0, cumulative cost = {5.0 rows, 10.0 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 2427
对应的可视化图:
物理计划中的$0, $1...这些数字表明的是as后的变量,若是是join有可能列名相同,因此也要添加project重命名防止名称冲突:
① select T0.* as $0, T0.N_REGIONKEY as $1 from nations T0 ② select T1.* as $0, T1.R_REGIONKEY as $1 from regions T1 ③ select T0.$0 as $0, T0.$1 as $1, T1.$0 as $2, T1.$1 as $3 from (select T0.$0 as $0, T0.$1 as $1 from nations) T0 join (select T1.$0 as $2, T1.$1 as $3 from regions) T1 on T0.$1 = T1.$3 ④ select $0 as $0,$2 as $1 from ( select T0.$0 as $0, T0.$1 as $1, T1.$0 as $2, T1.$1 as $3 from (select T0.$0 as $0, T0.$1 as $1 from nations) T0 join (select T1.$0 as $2, T1.$1 as $3 from regions) T1 on T0.$1 = T1.$3 ) ⑤ select $0 as *, $1 as *0 from( select $0 as $0,$2 as $1 from ( select T0.$0 as $0, T0.$1 as $1, T1.$0 as $2, T1.$1 as $3 from (select T0.$0 as $0, T0.$1 as $1 from nations) T0 join (select T1.$0 as $2, T1.$1 as $3 from regions) T1 on T0.$1 = T1.$3 ) ) select *,*0 from ...
上面的StarColumn规则有点复杂, 咱们看下Join列冲突的规则. 对应上面的③JOIN. 将全部的列都重命名了($0,$1,$2,$3, 而后以$1,$3进行join).
/* * 1.) * Join might cause naming conflicts from its left and right child. * In such case, we have to insert Project to rename the conflicting names. */ phyRelNode = JoinPrelRenameVisitor.insertRenameProject(phyRelNode);
根据注释中说的join有left或者right child. 注意child这个词的含义. join做为根, 而left和right表分别是根的左右子节点.
为了防止名称冲突, 添加project, 这样就和上面咱们看到的可视化Plan图是一一对应的了.
那么思考下: 这里的join插入的Project是在①和②,仍是④??
我以为是在④这里, 由于①和②已经在上面第一个转换规则StarColumnConverter中运用过了.
insert操做让传入的phyRelNode节点调用它的accept方法, 并接收JoinPrelRenameVisitor实例对象.
public class JoinPrelRenameVisitor extends BasePrelVisitor<Prel, Void, RuntimeException>{ private static JoinPrelRenameVisitor INSTANCE = new JoinPrelRenameVisitor(); public static Prel insertRenameProject(Prel prel){ return prel.accept(INSTANCE, null); }
这里的Prel经过层层的规则嵌套, 最终返回的仍是一个Prel, 也就是说,每次运用一个规则,都要把当前最新值传进来. Prel也实现了DrillRelNode接口.
DrillRelNode再结合上Visitor, 有种层层嵌套的感受.首先注册操做符的规则,从而构成一张图,最后根据DAG图访问每一个操做符的时候,再运用上规则.
假设上面JoinPrelRenameVisitor的insertRenameProject的Prel是JoinPrel
public abstract class JoinPrel extends DrillJoinRelBase implements Prel{ public <T, X, E extends Throwable> T accept(PrelVisitor<T, X, E> logicalVisitor, X value) throws E { return logicalVisitor.visitJoin(this, value); } public Iterator<Prel> iterator() { return PrelUtil.iter(getLeft(), getRight()); }
accept()的参数logicalVisitor显然就是JoinPrelRenameVisitor了. this是当前对象即JoinPrel.
那么就要调用JoinPrelRenameVisitor的visitJoin方法. 你看又回到Visitor来了.
public class JoinPrelRenameVisitor extends BasePrelVisitor<Prel, Void, RuntimeException>{ public Prel visitJoin(JoinPrel prel, Void value) throws RuntimeException { List<RelNode> children = Lists.newArrayList(); for(Prel child : prel){ child = child.accept(this, null); children.add(child); } final int leftCount = children.get(0).getRowType().getFieldCount(); List<RelNode> reNamedChildren = Lists.newArrayList(); RelNode left = prel.getJoinInput(0, children.get(0)); RelNode right = prel.getJoinInput(leftCount, children.get(1)); reNamedChildren.add(left); reNamedChildren.add(right); return (Prel) prel.copy(prel.getTraitSet(), reNamedChildren); } }
JoinPrel是个迭代器, 所以用for-loop方式能够遍历它的节点: 即参与join的left和right表(实现了iterator方法).
JoinPrel的getJoinInput方法参数是offset和RelNode. offset表示join以后列的索引(两张表join后的全部列).
假设咱们用两张同样的表进行join,能够看到相同的列, 右边的表会被重命名:
select * from dfs.`/usr/install/apache-drill-1.1.0/sample-data/region.parquet` region1 join dfs.`/usr/install/apache-drill-1.1.0/sample-data/region.parquet` regions on region1.R_REGIONKEY = regions.R_REGIONKEY; +--------------+--------------+-----------------------+---------------+--------------+-----------------------+ | R_REGIONKEY | R_NAME | R_COMMENT | R_REGIONKEY0 | R_NAME0 | R_COMMENT0 | +--------------+--------------+-----------------------+---------------+--------------+-----------------------+ | 0 | AFRICA | lar deposits. blithe | 0 | AFRICA | lar deposits. blithe |
分别调用两次getJoinInput,传入不一样的offset和input, 这两个结果必定是不一样的.
// Check to make sure that the fields of the inputs are the same as the output field names. If not, insert a project renaming them. public RelNode getJoinInput(int offset, RelNode input) { final List<String> fields = getRowType().getFieldNames(); final List<String> inputFields = input.getRowType().getFieldNames(); final List<String> outputFields = fields.subList(offset, offset + inputFields.size()); if (!outputFields.equals(inputFields)) { // Ensure that input field names are the same as output field names. // If there are duplicate field names on left and right, fields will get lost. // In such case, we need insert a rename Project on top of the input. return rename(input, input.getRowType().getFieldList(), outputFields); } else { return input; } }
上面的处理不知道什么状况下会进入if部分. 假设有两张表都是A,B,C三列.
left表不可能有重复的列名, right表相对于left而言,三个列都是重复的. 调用getJoinInput(3, rightNode){}
inputFields=[A,B,C], fields=[A,B,C,A,B,C]. outputFields=[A,B,C],不是相等的吗??
看下相同表的join的可视化树, 对比一下就知道了, 在00-04中加了Project:
00-00 Screen : rowType = RecordType(ANY *, ANY *0): rowcount = 5.0, cumulative cost = {20.5 rows, 120.5 cpu, 0.0 io, 0.0 network, 88.0 memory}, id = 1299 00-01 ProjectAllowDup(*=[$0], *0=[$1]) : rowType = RecordType(ANY *, ANY *0): rowcount = 5.0, cumulative cost = {20.0 rows, 120.0 cpu, 0.0 io, 0.0 network, 88.0 memory}, id = 1298 00-02 Project(T0¦¦*=[$0], T1¦¦*=[$2]) : rowType = RecordType(ANY T0¦¦*, ANY T1¦¦*): rowcount = 5.0, cumulative cost = {20.0 rows, 120.0 cpu, 0.0 io, 0.0 network, 88.0 memory}, id = 1297 00-03 HashJoin(condition=[=($1, $3)], joinType=[inner]) : rowType = RecordType(ANY T0¦¦*, ANY R_REGIONKEY, ANY T1¦¦*, ANY R_REGIONKEY0): rowcount = 5.0, cumulative cost = {20.0 rows, 120.0 cpu, 0.0 io, 0.0 network, 88.0 memory}, id = 1296 00-04 Project(T1¦¦*=[$0], R_REGIONKEY0=[$1]) : rowType = RecordType(ANY T1¦¦*, ANY R_REGIONKEY0): rowcount = 5.0, cumulative cost = {5.0 rows, 10.0 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 1295 00-06 Project(T1¦¦*=[$0], R_REGIONKEY=[$1]) : rowType = RecordType(ANY T1¦¦*, ANY R_REGIONKEY): rowcount = 5.0, cumulative cost = {5.0 rows, 10.0 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 1294 00-08 Scan(groupscan=[ParquetGroupScan [entries=[ReadEntryWithPath [path=file:/usr/install/apache-drill-1.1.0/sample-data/region.parquet]], selectionRoot=file:/usr/install/apache-drill-1.1.0/sample-data/region.parquet, numFiles=1, columns=[`*`]]]) : rowType = (DrillRecordRow[*, R_REGIONKEY]): rowcount = 5.0, cumulative cost = {5.0 rows, 10.0 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 1293 00-05 Project(T0¦¦*=[$0], R_REGIONKEY=[$1]) : rowType = RecordType(ANY T0¦¦*, ANY R_REGIONKEY): rowcount = 5.0, cumulative cost = {5.0 rows, 10.0 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 1292 00-07 Scan(groupscan=[ParquetGroupScan [entries=[ReadEntryWithPath [path=file:/usr/install/apache-drill-1.1.0/sample-data/region.parquet]], selectionRoot=file:/usr/install/apache-drill-1.1.0/sample-data/region.parquet, numFiles=1, columns=[`*`]]]) : rowType = (DrillRecordRow[*, R_REGIONKEY]): rowcount = 5.0, cumulative cost = {5.0 rows, 10.0 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 1291