Apache Drill源码分析(2) 分析一次具体的查询过程以及RPC


layout: post

Drill源码阅读(2) : 分析一次查询过程以及RPC

一次Query的生命周期

Foreman线程的run方法中的queryRequest是org.apache.drill.exec.proto.UserProtos的RunQuery
能够认为就是用户输入的查询语句,只不过因为是分布式,客户端输入的查询,会经过RPC在Foreman上执行
protobuffer文件的定义在drill-protocol/src/main/protobuf下,好比User.proto对应了UserProtosnode

关键看下run()上面的注释. 何时被调用: 在查询创建起来的时候
以什么样的方式调用: 执行线程池. 功能是什么: 完成远程执行
注意这个方法的结束并不表明查询生命周期的Foreman角色的结束.express

Called by execution pool to do query setup, and kick off remote execution.
Note that completion of this function is not the end of the Foreman's role in the query's lifecycle.apache

https://tnachen.wordpress.com/2013/11/05/lifetime-of-a-query-in-drill-...
http://yangyoupeng-cn-fujitsu-com.iteye.com/blog/1974556缓存

Client服务器

The SELECT statement in particular is querying a HDFS data file, with a specific WHERE clause filtering based on the expression clause.数据结构

From the client, it submits this statement into Sqlline, which is a simple Java-based console that is able to talk to a JDBC driver, passes the SELECT statement into Optiq.app

Optiq is a library that Drill utilizes for query parsing and planning, which it provides pluggable transformation rules that can map a SQL token into any specific object you want. Optiq also embeds a cost-based query optimizer, which we utilize for it for choosing the best order of SQL operators in the statement without any other statistics. We implemented custom Optiq rules that maps specific SQL operators (WHERE, LIMIT, etc) and each rule converts its operator into our specific Logical Operator syntax that Drill understands.分布式

This collection of Logical operators with its own configurations forms a Drill Logical plan. Drill’s logical plan sole purpose is to describe logically what work Drill needs to perform to produce the Query results, without any optimizations or distribution.ide

Once the client generates this logical plan, it looks up one of the DrillBit host/port information and passes the logical plan to that DrillBit.wordpress

Drill logical plan的惟一的目标就是Drill的数据流的工做流程,而没有作任何的优化,和分布式计算的分发等工做
一旦client产生了logical plan,那么他会查询其中一个已经配置好的DrillBit的host/port的信息,
而后将logical plan传递给DrillBit(这个接收查询的DrillBit就是Foreman)

Running Logical Plan

Any DrillBit in the cluster can initiate a query, and that DrillBit becomes the Drill process that is responsible for sending back the results back to the client.

在集群中任何一个DrillBit都能运行一个查询,而执行查询的DrillBit要负责将查询结果返回给client

Once the UserServer that is listening to the User submission gets the logical plan, it passes it through Foreman that is responsible for turning this plan into an actual execution plan and submits the plan information for execution.

UserServer会监听客户端提交的查询任务,一旦获取到逻辑计划,它会把逻辑计划传给Foreman.
Foreman会调优该plan,而且转换为实际执行的计划,并提交该计划的信息用于后面的执行.

Inside of Foreman, the first operation it does is to transform the logical plan into a physical plan via Drill’s Optimizer. Drill’s current version of Optimizer is very basic, which simply transforms each logical operator directly into one or more phyiscal operators without much optimization rules looking into the association of other operators.

The physical plan is simple a DAG of physical operators, and each child/parent relationship implies how data flows through the graph. As we are inspired by Google’s Dremel paper, the take away we saw that implemented which is a MPP style multi-level execution tree, where in this execution tree each node represents a different DrillBit process and they each depend on each other results for computation.

物理计划是physical operators的有向无环图.每一子节点或者父节点之间的关系都指明了数据如何在DAG图中流动
在这个执行树中,每个节点都表明一个不一样的DrillBit计算过程,他们相互依赖彼此的计算结果

As we want to break this physical plan into a multi-level execution tree that involves multiple DrillBits, we first need to collect information about each physical operator. Each physical operator with the given operator configuration can return estimated stats about Network/Cpu/Memory/Disk and Row size and count. It also can return a list of preferred DrillBits that it wants to get executed on, which we call endpoint affinities.

将物理计划分解成多个DrillBits参与的多层级的执行树,首先要搜集每个physical operators的信息
根据给定的操做符的配置信息,每一个physical operators会返回预估的统计信息,RowSize行的大小,Count数量
它也可以返回一个将要执行该operator的DrilBit列表,称做距离最近/最优的端点

相似HDFS中的读操做,读取HDFS块时,NN会返回这个块的DN列表,客户端读取离本身最近的DN的数据块

One example Endpoint affinity is where a Parquet Scan opreator will want to initiate this query closet to where the Parquet file is stored, and it can look up the Parquet file’s Metadata information that stores the HDFS data node host and return that as a preferred endpoint if we have a DrillBit running there.

好比一个Parquet扫描操做符会在离保存着Parquet文件最近的DrillBit上面发起查询
他能够查询Parquet文件的元数据信息: 元数据保存了HDFS的DN节点,并返回一个最优的endpoint

Parquet文件是相似JSON那样有者self-describe格式的文件,即文件自己含有schema,尽管schema是free的.
因为Parquet保存在HDFS上,HDFS上的文件是有副本的. 而Scan操做符是要访问文件的,
因此Scan操做符会选择离本身这个操做符最近的DN上的Parquet文件副本时,是最优的状况.
固然对于最优的端点的前提是这个节点安装了DrillBit服务. 由于Drill是操做符的载体.

也就是说,集群的DrillBit服务能够执行一个物理计划分解出来的physical operators
physical operators能够被集群的多个Drillbit执行.
一般DrillBit计算节点上也运行着DN这样的数据存储节点,而操做符须要存储的数据资源
因此操做符会选择离存储资源最近的Drillbit,这样的Drillbit是最优的endpoint.

With the physical plan, all the stats and endpoint affinities, the Parallellizer in Foreman transforms this plan into a number of fragments. Each fragment is simply a self contained Physical plan that is designed to run on each DrillBit node. In any given Physical plan, there will be only one Root Fragment that is going to run in the initiating DrillBit, possibly one or more Leaf fragments and possibly one or more intermediate fragments.

有了物理计划,全部的统计信息,最优端点,Foreman中的Parallellizer会将物理计划转换为多个fragments.
每个Fragment自身也是一个物理计划, 它们一样会被分配到DrillBit节点上面运行.
任何一个物理计划(通过Foreman转换后的每个Fragment)只有一个RootFragment,多个中间或Leaf Fragment.

Running Fragments

The Root fragment will be submitted to the Worker manager in its current DrillBit, the intermediate fragments will be stored in the HazelCast distributed cache, and all the leaf fragments will be sent directly to the DrillBits assigned via our BitCom through our RPC layer with Protobuf messages.

Rootfragment会被提交给它所在的当前DrillBit上的WorkerManager.中间fragment保存在Hazelcast分布式缓存,
全部的leaf fragment会直接经过BitCom(RPC层次的东西,协议是Protobuf)发送给其余DrillBits

在WEB页面能够看到的是Major和MinorFragment.那么这里的Root,Intermediate,Leaf Fragment是怎么YY出来的?

The Worker Manager once receives this Root Fragment starts running this plan, which always contains a Screen Operator that blocks and wait for Records to be returned. If a plan also has multiple Drillbits involved, then it will also contain a exchange operator that sets up a Receiver that waits for incoming Records from the wire.

Worker Manager一旦接受到Root Fragment就会运行这个plan,而且老是会包含一个Screen Operator,用来阻塞而且等待返回的数据.
若是该plan须要另外多个DrillBit参与,这些DrillBit组成一个wire,Worker Manager也同时会包含一个exchange operator,该exchange operator启动了一个Receiver用以等待wire中的数据

wire相似HDFS中DN组成的pipeline.当客户端写数据时,参与存储的DN造成一个管道,数据包在管道中流动.
只有全部的DN节点返回ack应答给客户端时,才认为数据写入成功. 这里参与计算的DrillBit节点也同样.
Exchange操做符相似于客户端,只有wire中的DrillBit数据获取完毕,返回给Receiver,才认为计算完成.

The Leaf fragments that are sent via the wire will be executed as soon as they arrive. The fragment will be parsed into a DAG with Physical operators, and setups the execution/data flow. Each Physical operator imposes a Pull style messaging, where starting from the bottom of the tree it pulls it’s parent for Records and the parent will return an Outcome status. The operators is designed to handle each possible outcome status, which can be STOP, OK, OK_WITH_NEW_SCHEMA, NONE. Since Drill supports flexible schema, which in other words can tolerate schema changes in the same dataset, the operators needs to handle what happens when there is a new schema for this set of records. Seeing the benefits of columnar storage:http://the-paper-trail.org/blog/columnar-storage/. Drill also implemented its own in memory columnar format which we called ValueVectors. A ValueVector is simply a collection of bytes that represent a run of values within the same column. Each pull of messages in each Physical operator returns a RecordBatch that can contain one or more ValueVectors per column with schema.

经过wire被发送的叶子Fragment一旦到达目的DrillBit就会被当即执行.叶子Fragment会被解析为由物理操做符组成的DAG.
每个物理操做符都会利用一个Pull类型的消息机制,从树的底部开始,operator会从他的parent operator中拉取Records,
而他的parent operator则返回一个Outcome status消息. operators被设计为能处理每一种可能的结果状态.
由于Drill支持动态schema,也就是说Drill容许在同一个数据集中schema发生变化,因此Drill要可以处理当schema发生变化的状况

Drill同时实现了他本身的列式内存数据结构:ValueVector,它是一个bytes集合,表明了一个column内的数据.
在每个Physical operator pull的消息中会返回一个RecordBatch: 对于每一个列都会包含一个或者多个ValueVector

At the very tip of the Leaf fragment in my slideshow example is the Scan operator, which is configured to look up a Parquet file and run it through the Parquet Storage Engine. The Storage engine is simply responsible for pulling in data from the data source, and converting them into ValueVectors and passes that back to its child as a RecordBatch.

从数据源中拉取数据,把数据转换为ValueVectors,而后将这些ValueVector做为RecordBatch传递回他的child

FragmentTree从底到上, 底部是Parent, 越往上就是Child. child会拉取parent的记录.
而从上到下来看,Fragment分解为RootFragment->Intermediate->LeafFragment.
这彷佛有点矛盾,leaf是parent,往上则是child. 而最上面又是root fragment.

扫描操做符的步骤:
1.Leaf Fragment拉取数据源的数据
2.将数据转换为ValueVectors
3.组装成RecordBatch
4.传递给它的孩子,即上一层

Eventually the Leaf fragment will take this batch, and through the Sender operator send it to the Intermediate DrillBit.

最终,全部的Leaf fragment将会接管这些batch数据,经过Sender operator发送给中间DrillBit
对于数据源,它们只暴露给物理计划造成的全部Leaf Fragment.这些Leaf Fragment负责读取数据.

The Intermediate DrillBit once receives a RecordBatch for the first time, will lookup the fragment from HazelCast by its fragment id from RecordBatch, and setup the Receiver and Physical opreators necessary for processing in this DrillBit.

中间fragment一旦第一次接受到一个RecordBatch,会从HazleCast中经过RecordBatch中保留的fragment id
查询相应的fragment,而且设置Receiver以及必要的physical operator来继续在DrillBit中进行处理计算

The intermediate fragment contains a Filtering operator, and inside this operator once it receives a RecordBatch it looks up the new schema and passes that to our CodeGeneration with the specified filter expression and type information, and generate a specific code that does the filtering. This is designed to avoid casting, run tight loops and leverage prefetching as we can eliminate function calls. This is a very common approach looking at the Hive’s new vectorized query engine via Stinger initiative or in Impala.

中间Fragment包含一个Filtering operator,在这个Filtering operator内部,一旦他接收到一个RecordBatch,他就会查找新的schema,而且将schema传递给CodeGeneration,同时还会传递一个特殊定义的filter expression,type information,借此产生一段特殊的code来完成filter 操做。经过设计成避免casting,运行轻量级的loop,以及进行prefetching,来减小方法的调用,这种方式在Hive的新vectoried query engine(经过Stinger initiative)以及impala中很广泛

The intermediate fragment eventually streams a batch at a time as soon as it is available to the Root DrillBit, which the Screen operator receives and returns it to the Client.

中间fragment最终会议batch为单元,一次发送一个batch给Root DrillBit(就是Foreman),
在Root DrillBit中会由Screen operator来接收相关数据,而且返回给client(接收查询的也负责返回查询结果 )

DrillClient that receives the RecordBatch, simply transforms our own columnar ValueVectors into Rows and shows the result to the client.

DrillClient接收RecordBatch,简单将ValueVector转换成Rows而且显示给client

This is overall what the flow looks like in our Drill alpha release, and we will be continuing to validate this architecture through diagnostic and benchmarking, and also provide more operators and storage engine so we can do much more interesting data analysis.

Physical Operator

前面有逻辑操做符LogicalOperator接口,一样也有物理操做符PhysicalOperator接口

咱们先看下HasAffinity有最优节点,方法getOperatorAffinity返回最优的EndPoint列表
描述了一个物理操做符对一些特定的DrillBit节点有亲和性的, 用于分配决策.

//Describes a physical operator that has affinity to particular nodes. Used for assignment decisions.
public interface HasAffinity extends PhysicalOperator {
  public List<EndpointAffinity> getOperatorAffinity();
}

public class EndpointAffinity {
  private final DrillbitEndpoint endpoint;
  private double affinity = 0.0d;

EndpointAffinity captures affinity value for a given single Drillbit endpoint.
EndpointAffinity有DrillbitEndpoint的引用, 注释中提到affinity value,因此是否是够亲和是能够计算出来的.
初始时这个值是0,调用addAffinity()能够给Drillbit endpoint添加一个亲和力的值.

DrillBit Endpoint对象被定义为protobuf,在Coordination.proto中:

message DrillbitEndpoint{
  optional string address = 1;
  optional int32 user_port = 2;
  optional int32 control_port = 3;
  optional int32 data_port = 4;
  optional Roles roles = 5;
}

Drillbit能够认为是Drill的计算节点. 在bin下的drillbit.sh start启动一个Drill服务.

LogicalPlan有必定的格式:head,storageengine,query. 一样PyhsicalPlan也有,它们的head是相同的.
PhysicalPlan的构造函数是一系列的物理操做符,而LogicalPlan的构造函数是逻辑操做符集合.目的都是构造一张DAG图.

@JsonPropertyOrder({ "head", "graph" })
public class PhysicalPlan {
  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(PhysicalPlan.class);

  PlanProperties properties;
  Graph<PhysicalOperator, Root, Leaf> graph;

  @JsonCreator
  public PhysicalPlan(@JsonProperty("head") PlanProperties properties, @JsonProperty("graph") List<PhysicalOperator> operators){
    this.properties = properties;
    this.graph = Graph.newGraph(operators, Root.class, Leaf.class);
  }

查询过程分析

在官方的设计文档中http://drill.apache.org/docs/rpc-overview/对RPC只是寥寥数语,还有待补充.

客户端提交查询

咱们根据上面的Query流程一步步分析, 首先是客户端提交一个查询, 通过Optiq生成逻辑计划后会交给DrillClient运行:

/**
   * Submits a Logical plan for direct execution (bypasses parsing) 提交一个逻辑计划,直接执行
   * @param plan the plan to execute
   * @return a handle for the query result
   */
  public List<QueryDataBatch> runQuery(QueryType type, String plan) throws RpcException {
    UserProtos.RunQuery query = newBuilder().setResultsMode(STREAM_FULL).setType(type).setPlan(plan).build();
    ListHoldingResultsListener listener = new ListHoldingResultsListener(query);
    client.submitQuery(listener, query);  //这个client是UserClient,而不是DrillClient了
    return listener.getResults();
  }

逻辑计划封装成RunQuery协议,监听器ListHoldingResultsListener用于当获取到结果后通知客户端能够获取数据了.
监听器用Future来封装查询的结果集合,若是结果尚未出来,调用future.get()会被阻塞,这是Java的Future语法自己的特性.

private class ListHoldingResultsListener implements UserResultsListener {
    private Vector<QueryDataBatch> results = new Vector<>();
    private SettableFuture<List<QueryDataBatch>> future = SettableFuture.create();
    private UserProtos.RunQuery query ;  //提交失败时, 在从新链接后, 由客户端从新链接

    public void queryCompleted(QueryState state) {
      future.set(results);
    }
    public void dataArrived(QueryDataBatch result, ConnectionThrottle throttle) {
      results.add(result);
    }
    public List<QueryDataBatch> getResults() throws RpcException{
        return future.get();
    }   
  }

UserClient做为用户的客户端,会向DrillBit发送一个RUN_QUERY的请求, 发送内容在RunQuery query对象里.

public class UserClient extends BasicClientWithConnection<RpcType, UserToBitHandshake, BitToUserHandshake> {
  private final QueryResultHandler queryResultHandler = new QueryResultHandler();

  public void submitQuery(UserResultsListener resultsListener, RunQuery query) {
    send(queryResultHandler.getWrappedListener(connection, resultsListener), RpcType.RUN_QUERY, query, QueryId.class);
  }

connection对象是客户端创建的到服务器端的链接, 在UserClient的父类BasicClient的构造方法中建立的.
这里用的是Netty, 客户端在建立过程还绑定了多个Handler: 协议的编解码,消息的编解码,InboundHandler到达处理器,客户端握手Handler.

public abstract class BasicClient<T extends EnumLite, R extends RemoteConnection, HANDSHAKE_SEND extends MessageLite, HANDSHAKE_RESPONSE extends MessageLite> extends RpcBus<T, R> {
  private final Bootstrap b;
  protected R connection;
  public BasicClient(...) {  
    b = new Bootstrap()  
        .handler(new ChannelInitializer<SocketChannel>() {
          protected void initChannel(SocketChannel ch) throws Exception {
            connection = initRemoteConnection(ch);
            ch.closeFuture().addListener(getCloseHandler(ch, connection));
            final ChannelPipeline pipe = ch.pipeline();
            pipe.addLast("protocol-decoder", getDecoder(connection.getAllocator()));
            pipe.addLast("message-decoder", new RpcDecoder("c-" + rpcConfig.getName()));
            pipe.addLast("protocol-encoder", new RpcEncoder("c-" + rpcConfig.getName()));
            pipe.addLast("handshake-handler", new ClientHandshakeHandler());
            pipe.addLast("message-handler", new InboundHandler(connection));
            pipe.addLast("exception-handler", new RpcExceptionHandler(connection));
          }
        });    
  }

send()调用最终会调用RpcBus的同名send方法, 第一个参数listener是Rpc的输出监听器(相对Income到达)
其中发送内容RunQuery query是protobufBody, 最后一个参数dataBodies是可选的.

// The Rpc Bus deals with incoming and outgoing communication and is used on both the server and the client side of a system.
public abstract class RpcBus<T extends EnumLite, C extends RemoteConnection> implements Closeable {
  protected final CoordinationQueue queue = new CoordinationQueue(16, 16);

  public <SEND extends MessageLite, RECEIVE extends MessageLite> void send(RpcOutcomeListener<RECEIVE> listener, C connection, T rpcType,
      SEND protobufBody, Class<RECEIVE> clazz, boolean allowInEventLoop, ByteBuf... dataBodies) {
      ChannelListenerWithCoordinationId futureListener = queue.get(listener, clazz, connection);
      OutboundRpcMessage m = new OutboundRpcMessage(RpcMode.REQUEST, rpcType, futureListener.getCoordinationId(), protobufBody, dataBodies);
      ChannelFuture channelFuture = connection.getChannel().writeAndFlush(m);
      channelFuture.addListener(futureListener);
      channelFuture.addListener(ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE);
  }

客户端发起的一次查询是怎么提交到服务端上执行: connection.getChannel().writeAndFlush(m)
客户端经过connection的通道往服务端写入一个Rpc消息, Rpc消息分为到达Inboud和输出Outbound.
OutboundRpcMessage含有protobuf协议体,以及数据部分. 协议自己只是定义了数据的格式. 真正的数据也要一块儿发送出去.
虽然这里是客户端的查询请求, dataBodies自己是没有值的,由于在一开始调用的时候就没有传入这个参数.

到此为止, 客户端发起的一次查询请求已经完成了, 接下去的流程交给服务端处理这个请求了.
这里用到一个futureListener, 是为了可以监听服务器端是否已经把数据准备好了.
这里的queue会将CoordinationId和RpcListener放到map中, 并在接收到数据后从map中移除.

public <V> ChannelListenerWithCoordinationId get(RpcOutcomeListener<V> handler, Class<V> clazz, RemoteConnection connection) {
    int i = circularInt.getNext();
    RpcListener<V> future = new RpcListener<V>(handler, clazz, i, connection);
    Object old = map.put(i, future);
    return future;
  }

服务端处理Query请求

服务端的启动方式和客户端同样都是经过Netty. 而且都注册了一个InboundHandler.
由于在上一步客户端发送REQUET请求, 因此服务器的InboundHandler可以接收到这个请求

protected class InboundHandler extends MessageToMessageDecoder<InboundRpcMessage> {
    private final C connection;
    public InboundHandler(C connection) {
      this.connection = connection;
    }

    protected void decode(final ChannelHandlerContext ctx, final InboundRpcMessage msg, final List<Object> output) throws Exception {
      final Channel channel = connection.getChannel();
      final Stopwatch watch = new Stopwatch().start();

        switch (msg.mode) {
        case REQUEST: {
            // handle message and ack.
            ResponseSender sender = new ResponseSenderImpl(connection, msg.coordinationId);
            handle(connection, msg.rpcType, msg.pBody, msg.dBody, sender);
            break;
        }

这里InboundHandler继承的是Netty的MessageToMessageDecoder抽象类,因此要在decode中重写接收的逻辑
若是是继承Netty的ChannelInboundHandlerAdapter, 则重写的方法是channelRead, 后面这种在netty的helloworld中比较常见.
为何须要ResponseSender, 由于服务端接收客户端的查询请求, 在获取到结果后, 要负责将结果response发送send给客户端才算完成.

protected void handle(C connection, int rpcType, ByteBuf pBody, ByteBuf dBody, ResponseSender sender) throws RpcException{
    sender.send(handle(connection, rpcType, pBody, dBody));
  }

  protected abstract Response handle(C connection, int rpcType, ByteBuf pBody, ByteBuf dBody) throws RpcException;

handle是个抽象方法, 这里由于在Server中了, 因此选择UserServer的实现方法. 客户端传入的rpcType=RUN_QUERY等于下面的RUN_QUERY_VALUE

public class UserServer extends BasicServer<RpcType, UserServer.UserClientConnection> {
  final UserWorker worker;

  protected Response handle(UserClientConnection connection, int rpcType, ByteBuf pBody, ByteBuf dBody){
    switch (rpcType) {
    case RpcType.RUN_QUERY_VALUE:
        final RunQuery query = RunQuery.PARSER.parseFrom(new ByteBufInputStream(pBody));
        final QueryId queryId = worker.submitWork(connection, query);
        return new Response(RpcType.QUERY_HANDLE, queryId);

DrillClient将查询交给UserClient, UserServer则将具体的查询工做交给了UserWorker, 它的返回值也是一个QueryId协议.
最终的返回值是Response, 对应了RpcBus的sender.send(handle(...)) --> sender.send(Response)

注意服务端接受到查询请求RUN_QUERY后, 交给worker处理, worker会当即返回这个查询对应的QueryId. 所以也不是当即返回结果的
看下服务端的ResponseSender的实现类, 定义在RpcBus中.

private class ResponseSenderImpl implements ResponseSender {
    RemoteConnection connection;
    int coordinationId;

    public ResponseSenderImpl(RemoteConnection connection, int coordinationId) {
      super();
      this.connection = connection;
      this.coordinationId = coordinationId;
    }

    public void send(Response r) {
      OutboundRpcMessage outMessage = new OutboundRpcMessage(RpcMode.RESPONSE, r.rpcType, coordinationId, r.pBody, r.dBodies);
      connection.getChannel().writeAndFlush(outMessage);
    }
  }

注意这里的coordinationId是客户端放入queue队列中futureListener的一个编号. 而服务端返回的QueryId是r.pBody.
服务端也将OutboundRpcMessage经过connection通道写回去. 即写到客户端去, 由于服务端并无将查询结果当即计算出来,
因此须要将QueryId返回给客户端, 并在适当的时候若是服务端获取到结果会通知客户端.

服务端发送的是Response, 因此如今服务端的流程已经走完了(虽然worker尚未完成,可是对于一次RPC来讲是完成了), 轮到客户端接收响应.

客户端获取查询结果

仍是在RpcBus的InboundHandler中. 只不过此次是客户端接受到服务端发送的响应请求:

case RESPONSE:
            MessageLite m = getResponseDefaultInstance(msg.rpcType);  //这里是QueryId
            RpcOutcome<?> rpcFuture = queue.getFuture(msg.rpcType, msg.coordinationId, m.getClass());  //对应一开始的queue.get(...)
            Parser<?> parser = m.getParserForType();
            Object value = parser.parseFrom(new ByteBufInputStream(msg.pBody, msg.pBody.readableBytes()));  //pBody只是协议格式
            rpcFuture.set(value, msg.dBody);  //dBody才是数据内容
          break;

从队列中获取Future, 最后调用future的set方法, 将数据设置到value中, 这样future.get()就能获取到value.

public <V> RpcOutcome<V> getFuture(int rpcType, int coordinationId, Class<V> clazz) {
    RpcOutcome<?> rpc = map.remove(coordinationId);
    return (RpcOutcome<V>) rpc;
  }

Listener & Handler

Step Operation
DrillClient.runQuery: ListHoldingResultsListener
UserClient.submitQuery: 经过QueryResultHandler将ListHoldingResultsListener>>UserResultsListener封装为SubmissionListener>>RpcOutcomeListener
RpcBus.send: 将RpcOutcomeListener封装为ChannelListenerWithCoordinationId<>RpcOutcome
InboundHandler.RESPONSE: 从队列中获取RpcOutcome rpcFuture, 设置数据到value
RpcListener.set: 调用RpcOutcomeListener.success, 即SubmissionListener.success: 添加UserResultsListener负责监听结果
ListHoldingResultsListener.getResults: 从Future中获取结果
Object Why
ListHoldingResultsListener 持有结果集的监听器, 因此负责最终结果的获取,固然获取操做不在这里, 它只管取数据
SubmissionListener 提交监听器, 做业提交后, 我负责执行, 可是执行的动做并不在这里哦
UserResultsListener 用户的结果集监听器, 这里应该是负责结果的产生, 不过它是个接口
RpcListener RPC监听器,要和具体本次查询的coordinationId关联起来

用户的结果集监听器的方法表示了查询的一个过程:
1.QueryId到达, 由服务端产生QueryId
2.数据到达, 并被监听器成功接收到
3.查询完毕, 监听器不会再收到任何数据

public interface UserResultsListener {
  /**
   * QueryId is available. Called when a query is successfully submitted to the server.
   * @param queryId sent by the server along {@link org.apache.drill.exec.rpc.Acks.OK Acks.OK}
   */
  void queryIdArrived(QueryId queryId);

  void submissionFailed(UserException ex);

  /**
   * A {@link org.apache.drill.exec.proto.beans.QueryData QueryData} message was received
   * @param result data batch received
   */
  void dataArrived(QueryDataBatch result, ConnectionThrottle throttle);

  /**
   * The query has completed (successsful completion or cancellation). The listener will not receive any other
   * data or result message. Called when the server returns a terminal-non failing- state (COMPLETED or CANCELLED)
   */
  void queryCompleted(QueryState state);
}

Fragment

相关文章
相关标签/搜索