版权声明:本文为xpleaf(香飘叶子)博主原创文章,遵循CC 4.0 BY-SA 版权协议,转载请附上原文出处连接和本声明。java
本文较为系统、全面而且由浅入深地介绍了网易Spark Kyuubi出现的背景、核心架构设计与关键源码实现,是学习、应用和对Kyuubi进行二次开发不可多得的技术干货,但因为做者认知水平有限,文中不免会出现描述不许确的措辞,还请多多包容和指出。node
Kyuubi是网易数帆旗下易数大数据团队开源的一个高性能的通用JDBC和SQL执行引擎,创建在Apache Spark之上,Kyuubi的出现,较好的弥补了Spark ThriftServer在多租户、资源隔离和高可用等方面的不足,是一个真正能够知足大多数生产环境场景的开源项目。git
经过分析Spark ThriftServer的设计与不足,本文会逐渐带你深刻理解Kyuubi的核心设计与实现,同时会选取多个关键场景来剖析其源码,经过本文的阅读,但愿能让读者对网易Kyuubi的总体架构设计有一个较为清晰的理解,并可以用在本身的生产环境中解决更多实际应用问题。github
本文主要主要选取Kyuubi 1.1.0版原本对其设计与实现进行分析,后续的版本迭代社区加入了数据湖等概念和实现,本文不会对这方面的内容进行探讨。sql
在最初使用Spark时,只有理解了Spark RDD模型和其提供的各类算子时,才能比较好地使用Spark进行数据处理和分析,显然因为向上层暴露了过多底层实现细节,Spark有必定的高使用门槛,在易用性上对许多初入门用户来讲并不太友好。apache
SparkSQL的出现则较好地解决了这一问题,经过使用SparkSQL提供的简易API,用户只须要有基本的编程基础而且会使用SQL,就能够借助Spark强大的快速分布式计算能力来处理和分析他们的大规模数据集。编程
而Spark ThriftServer的出现使Spark的易用性又向前迈进了一步,经过提供标准的JDBC接口和命令行终端的方式,平台开发者能够基于其提供的服务来快速构建它们的数据分析应用,普通用户甚至不须要有编程基础便可借助其强大的能力来进行交互式数据分析。缓存
顾名思义,本质上,Spark ThriftServer是一个基于Apache Thrift框架构建而且封装了SparkContext的RPC服务端,或者从Spark的层面来说,咱们也能够说,Spark ThriftServer是一个提供了各类RPC服务的Spark Driver。但无论从哪一个角度去看Spark ThriftServer,有一点能够确定的是,它是一个Server,是须要对外提供服务的,所以其是常驻的进程,并不会像通常咱们构建的Spark Application在完成数据处理的工做逻辑后就退出。其总体架构图以下所示:安全
Apache Thrift是业界流行的RPC框架,经过其提供的接口描述语言(IDL),能够快速构建用于数据通讯的而且语言无关的RPC客户端和服务端,在带来高性能的同时,大大下降了开发人员构建RPC服务的成本,所以在大数据生态其有较多的应用场景,好比咱们熟知的hiveserver2便是基于Apache Thrift来构建其RPC服务。网络
当用户经过JDBC或beeline方式执行一条SQL语句时,TThreadPoolServer
会接收到该SQL,经过一系列的Session和Operation的管理,最终会使用在启动Spark ThriftServer时已经构建好的SparkContext来执行该SQL,并获取最后的结果集。
从上面的基本分析中咱们能够看到,在不考虑Spark ThrfitServer的底层RPC通讯框架和业务细节时,其总体实现思路是比较清晰和简单的。
固然实际上要构建一个对外提供SQL能力的RPC服务时,是有许多细节须要考虑的,而且工做量也会很是巨大,Spark ThriftServer在实现时实际上也没有本身重复造轮子,它复用了hiveserver2的许多组件和逻辑,并根据自身的业务需求来对其进行特定改造;一样的,后面当咱们去看Kyuubi时,也会发现它复用了hiveserver2和Spark ThriftServer的一些组件和逻辑,并在此基础上创新性地设计本身的一套架构。
这里列举的代码是基于Spark 2.1的源码,新版本在结构上可能有全部区别,但不影响咱们对其本质实现原理的理解。
前面提到的TThreadPoolServer
是Apache Thrift提供的用于构建RPC Server的一个工做线程池类,在Spark ThriftServer的Service体系结构中,ThriftBinaryService
正是使用TThreadPoolServer
来构建RPC服务端并对外提供一系列RPC服务接口:
Spark ThriftServer Service体系
ThriftBinaryService
基于TThreadPoolServer
构建RPC服务端
// org.apache.hive.service.cli.thrift.ThriftBinaryCLIService#run public class ThriftBinaryCLIService extends ThriftCLIService { @Override public void run() { // ...省略其它细节... // TCP Server server = new TThreadPoolServer(sargs); server.setServerEventHandler(serverEventHandler); server.serve(); // ...省略其它细节... } }
ThriftBinaryService
提供的RPC服务接口
// org.apache.hive.service.cli.thrift.TCLIService.Iface TOpenSessionResp OpenSession(TOpenSessionReq req); TCloseSessionResp CloseSession(TCloseSessionReq req); TGetInfoResp GetInfo(TGetInfoReq req); TExecuteStatementResp ExecuteStatement(TExecuteStatementReq req); TGetTypeInfoResp GetTypeInfo(TGetTypeInfoReq req); TGetCatalogsResp GetCatalogs(TGetCatalogsReq req); TGetSchemasResp GetSchemas(TGetSchemasReq req); TGetTablesResp GetTables(TGetTablesReq req); TGetTableTypesResp GetTableTypes(TGetTableTypesReq req); TGetColumnsResp GetColumns(TGetColumnsReq req); TGetFunctionsResp GetFunctions(TGetFunctionsReq req); TGetOperationStatusResp GetOperationStatus(TGetOperationStatusReq req); TCancelOperationResp CancelOperation(TCancelOperationReq req); TCloseOperationResp CloseOperation(TCloseOperationReq req); TGetResultSetMetadataResp GetResultSetMetadata(TGetResultSetMetadataReq req); TFetchResultsResp FetchResults(TFetchResultsReq req); TGetDelegationTokenResp GetDelegationToken(TGetDelegationTokenReq req); TCancelDelegationTokenResp CancelDelegationToken(TCancelDelegationTokenReq req); TRenewDelegationTokenResp RenewDelegationToken(TRenewDelegationTokenReq req);
能够看到,其提供的至关一部分接口都是提供SQL服务时所必要的能力。
固然,无论是使用标准的JDBC接口仍是经过beeline的方式来访问Spark ThriftServer,必然都是经过Spark基于Apache Thrift构建的RPC客户端来访问这些RPC服务接口的,所以咱们去看Spark ThriftServer提供的RPC客户端,其提供的方法接口与RPC服务端提供的是对应的,能够参考org.apache.hive.service.cli.thrift.TCLIService.Client。
若是比较难以理解,建议能够先研究一下RPC框架的本质,而后再简单使用一下Apache Thrift来构建RPC服务端和客户端,这样就会有一个比较清晰的理解,这里不对其底层框架和原理作更多深刻的分析。我的以为,要理解Spark ThriftServer,或是后面要介绍的Kyubbi,本质上是理解其通讯框架,也就是其是怎么使用Apache Thrift来进行通讯的,由于其它的细节都是业务实现。
Spark ThriftServer在带来各类便利性的同时,其不足也是显而易见的。
首先,Spark ThriftServer难以知足生产环境下多租户与资源隔离的场景需求。因为一个Spark ThriftServer全局只有一个SparkContext,也即只有一个Spark Application,其在启动时就肯定了全局惟一的用户名,所以在Spark ThriftServer的维护人员看来,全部经过Spark ThriftServer下发的SQL都是来自同一用户(也就是启动时肯定的全局惟一的用户名),尽管其背后其实是由使用Spark ThriftServer服务的不一样用户下发的,但全部背后的这些用户都共享使用了Spark ThriftServer的资源、权限和数据,所以咱们难以单独对某个用户作资源和权限上的控制,操做审计和其它安全策略。
在Spark ThriftServer执行的一条SQL实际上会被转换为一个job执行,若是用户A下发的SQL的job执行时间较长,必然也会阻塞后续用户B下发的SQL的执行。
其次,单个Spark ThriftServer也容易带来单点故障问题。从Spark ThriftServer接受的客户端请求和其与Executor的通讯来考虑,Spark ThriftServer自己的可靠性也难以知足生产环境下的需求。
所以,在将Spark ThriftServer应用于生产环境当中,上面说起的问题和局限性都会不可避免,那业界有没有比较好的解决方案呢?网易开源的Spark Kyuubi就给出了比较好的答案。
Kyuubi的总体架构设计以下:
Kyuubi从总体上能够分为用户层、服务发现层、Kyuubi Server层、Kyuubi Engine层,其总体概述以下:
用户层
指经过不一样方式使用Kyuubi的用户,好比经过JDBC或beeline方式使用Kyuubi的用户。
服务发现层
服务发现层依赖于Zookeeper实现,其又分为Kyuubi Server层的服务发现和Kyuubi Engine层的服务发现。
Kyuubi Server层
由多个不一样的KyuubiServer实例组成,每一个KyuubiServer实例本质上为基于Apache Thrift实现的RPC服务端,其接收来自用户的请求,但并不会真正执行该请求的相关SQL操做,只会做为代理转发该请求到Kyuubi Engine层用户所属的SparkSQLEngine实例上。
Kyuubi Engine层
由多个不一样的SparkSQLEngine实例组成,每一个SparkSQLEngine实例本质上为基于Apache Thrift实现的而且持有一个SparkSession实例的RPC服务端,其接收来自KyuubiServer实例的请求,并经过SparkSession实例来执行。在Kyuubi的USER共享层级上,每一个SparkSQLEngine实例都是用户级别的,即不一样的用户其会持有不一样的SparkSQLEngine实例,以实现用户级别的资源隔离和控制。
下面将会对每一层以及它们的协做与交互展开较为详细的分析。
用户层就是指实际须要使用Kyuubi服务的用户,它们经过不过的用户名进行标识,以JDBC或beeline方式进行链接。
好比咱们能够在beeline中指定以不一样用户名进行登陆:
使用xpleaf用户名进行登陆
./beeline -u 'jdbc:hive2://10.2.10.1:10009' -n xpleaf
使用yyh用户名进行登陆
./beeline -u 'jdbc:hive2://10.2.10.1:10010' -n yyh
使用leaf用户名进行登陆
./beeline -u 'jdbc:hive2://10.2.10.2:10009' -n leaf
固然,这里的用户名或登陆标识并非能够随意指定或使用的,它应该根据实际使用场景由运维系统管理人员进行分配,而且其背后应当有一整套完整的认证、受权和审计机制,以确保总体系统的安全。
服务发现层主要是指Zookeepr服务以及Kyuubi Server层的KyuubiServer实例和Kyuubi Engine层的SparkSQLEngine在上面注册的命名空间(即node节点),以提供负载均衡和高可用等特性,所以它分为Kyuubi Server层的服务发现和Kyuubi Engine层的服务发现。
Kyuubi Server层的服务发现
Kyuubi Server层的服务发现是须要用户感知的。
KyuubiServer实例在启动以后都会向Zookeeper的/kyuubi
节点下面建立关于本身实例信息的节点,主要是包含KyuubiServer实例监听的host和port这两个关键信息,这样用户在链接KyuubiServer时,只须要到Zookeeper的/kyuubi
节点下面获取对应的服务信息便可,当有多个KyuubiServer实例时,选取哪个实例进行登陆,这个是由用户自行决定的,Kyuubi自己并不会进行干预。
在实际应用时也能够封装接口实现随机返回实例给用户,以免直接暴露Kyuubi的底层实现给用户。
另外,KyuubiServer实例是对全部用户共享,并不会存在特定KyuubiServer实例只对特定用户服务的问题。
固然在实际应用时你也能够这么作,好比你能够不对用户暴露服务发现,也就是不对用户暴露Zookeeper,对于不一样用户,直接告诉他们相应的KyuubiServer实例链接信息便可。不过这样一来,Kyuubi Server层的高可用就难以保证了。
好比有多个在不一样节点上启动的KyuubiServer实例,其在Zookeeper上面注册的信息以下:
/kyuubi/instance1_10.2.10.1:10009 /kyuubi/instance2_10.2.10.1:10010 /kyuubi/instance3_10.2.10.2:10009
Kyuubi Engine层的服务发现
Kyuubi Engine层的服务发现是不须要用户感知的,其属于Kyuubi内部不一样组件之间的一种通讯协做方式。
SparkSQLEngine实例在启动以后都会向Zookeeper的/kyuubi_USER
节点下面建立关于本身实例信息的节点,主要是包含该实例监听的host和port以及其所属user的相关信息,也就是说SparkSQLEngine实例并非全部用户共享的,它是由用户独享的。
好比Kyuubi系统中有多个不一样用户使用了Kyuubi服务,启动了多个SparkSQLEngine实例,其在Zookeeper上面注册的信息以下:
/kyuubi_USER/xpleaf/instance1_10.2.20.1:52643 /kyuubi_USER/yyh/instance2_10.2.10.1:52346 /kyuubi_USER/leaf/instance3_10.2.10.2:51762
Kyuubi Server层由多个不一样的KyuubiServer实例组成,每一个KyuubiServer实例本质上为基于Apache Thrift实现的RPC服务端,其接收来自用户的请求,但并不会真正执行该请求的相关SQL操做,只会做为代理转发该请求到Kyuubi Engine层用户所属的SparkSQLEngine实例上。
整个Kyuubi系统中须要存在多少个KyuubiServer实例是由Kyuubi系统管理员决定的,根据实际使用Kyuubi服务的用户数和并发数,能够部署一个或多个KyuubiServer实例,以知足SLA要求。固然后续发现KyuubiServer实例不够时,能够横向动态扩容,只须要在Kyuubi中系统配置好host和port,启动新的KyuubiServer实例便可。
Kyuubi Engine层由多个不一样的SparkSQLEngine实例组成,每一个SparkSQLEngine实例本质上为基于Apache Thrift实现的而且持有一个SparkSession实例的RPC服务端,其接收来自KyuubiServer实例的请求,并经过SparkSession实例来执行。在Kyuubi的USER共享层级上,每一个SparkSQLEngine实例都是用户级别的,即不一样的用户其会持有不一样的SparkSQLEngine实例,以实现用户级别的资源隔离和控制。
SparkSQLEngine实例是针对不一样的用户按需启动的。在Kyuubi总体系统启动以后,若是没有用户访问Kyuubi服务,实际上在整个系统中只有一个或多个KyuubiServer实例,当有用户经过JDBC或beeline的方式链接KyuubiServer实例时,其会在Zookeeper上去查找是否存在用户所属的SparkSQLEngine实例,若是没有,则经过spark-submit
提交一个Spark应用,而这个Spark应用自己就是SparkSQLEngine,启动后,基于其内部构建的SparkSession实例,便可为特定用户执行相关SQL操做。
经过前面对各层的介绍,结合KyubbiServer架构图,以用户xpleaf
访问Kyuubi服务为例来描述整个流程。
1.Kyuubi系统管理员在大数据集群中启动了3个KyuubiServer实例和1个Zookeeper集群,其中3个KyuubiServer实例的链接信息分别为10.2.10.1:10009
、10.2.10.1:10010
和10.2.10.2:1009
;
2.用户xpleaf经过beeline终端的方式链接了其中一个KyuubiServer实例;
./beeline -u 'jdbc:hive2://10.2.10.1:10009' -n xpleaf
在这里咱们假设用户xpleaf事先已经经过管理员告知的方式知道了该KyuubiServer实例的链接信息。
3.KyuubiServer_instance1接收到xpleaf的链接请求,会为该用户建立session会话,同时会去Zookeeper上检查是否已经存在xpleaf所属的SparkSQLEngine实例;
4.KyuubiServer_instance1在Zookeeper上没有找到xpleaf所属的SparkSQLEngine实例信息,其经过spark-submit的方式启动了一个SparkSQLEngine实例;
5.属于xpleaf用户的新的SparkSQLEngine_instance1实例在10.2.10.1
节点上进行启动,而且监听的52463
端口,启动后,其向Zookeeper注册本身的链接信息/kyuubi_USER/xpleaf/instance1_10.2.10.1:52463
;
6.KyuubiServer_instance1在检测到SparkSQLEngine_instance1启动成功后,会向其发送建立session会话的链接请求;
7.SparkSQLEngine_instance1收到KyuubiServer_instance1建立session会话的链接请求,则建立一个新的session会话;
8.用户启动beeleine完成并成功建立会话,接着用户执行SQL查询;
0: jdbc:hive2://10.2.10.1:10009> select * from teacher;
9.KyuubiServer_instance1接收到xpleaf的执行SQL查询的请求,会先检查是否存在xpleaf所属的SparkSQLEngine实例;
10.KyuubiServer_instance1找到xpleaf所属的SparkSQLEngine_instance1实例,接着会为此次执行SQL的操做建立一个Operation;
11.KyuubiServer_instance1根据链接信息建立了一个RPC Client,而且构建SQL执行的RPC请求,发到对应的SparkSQLEngine_instance1实例上;
12.SparkSQLEngine_instance1接收到该请求后,会建立一个该SQL操做的Operation,而且使用其内部的SparkSession实例来进行执行,最后将执行结果返回给KyuubiServer_instance1;
13.KyuubiServer_instance1接收到SparkSQLEngine_instance1的执行结果,返回给用户,这样一次SQL查询操做就完成了。
0: jdbc:hive2://localhost:10009> select * from teacher; +-----------+------------+--------------+ | database | tableName | isTemporary | +-----------+------------+--------------+ | default | teacher | false | +-----------+------------+--------------+ 1 row selected (0.19 seconds)
透过总体协做流程咱们能够看到:
Kyuubi在总体Server端和Client端以及其实现功能的设计上,是十分清晰的。
经过前面对Kyuubi各层以及总体协做流程的描述,相信对Kyuubi的核心架构设计会有一个比较清晰的理解,这样再去分析Kyuubi的源码时就会简单不少。
首先咱们会来介绍Kyuubi总体的Service体系与组合关系,以对Kyuubi总体核心代码有一个概览性的理解,接着会选取多个关键场景来对Kyuubi的源码进行分析,而且给出每一个场景的代码执行流程图。
确实没有办法在较为简短的篇幅里为你们介绍Kyuubi源码的方方面面,但我我的认为无论对于哪一个大数据组件,在理解了其底层通讯框架的基础上,再选取关于该组件的几个或多个关键场景来分析其源码,基本上对其总体设计就会有概览性的理解,这样后面对于该组件可能出现的Bug进行排查与修复,或是对该组件进行深度定制以知足业务的实际需求,我相信问题都不大——这也就达到了咱们的目的,就是去解决实际问题。
固然,在这个过程中你也能够欣赏到漂亮的代码,这自己也是一种享受。
RPC
RPC(Remote Procedure Call)远程过程调用,若是按照百度百科的解释会很是羞涩难懂(上面提供的图应该仍是《TCP/IP详解卷1:协议》上面的一个图),但实际上咱们就能够简单地把它理解为,一个进程调用另一个进程的服务便可,无论是经过Socket、内存共享或是网络的方式,只要其调用的服务的具体实现不是在调用方的进程内完成的就能够,目前咱们见得比较多的是经过网络通讯调用服务的方式。
在Java语言层面上比较广泛的RPC实现方式是,反射+网络通讯+动态代理的方式来实现RPC,而网络通讯因为须要考虑各类性能指标,主要用的Netty或者原生的NIO比较多,Socket通常比较少用,好比能够看一下阿里Doubbo的实现。
若是想加深这方面的理解,能够参考个人一个开源RPC框架,其实就是很是mini版的Doubbo实现:https://github.com/xpleaf/minidubbo,建议有时间能够看下,实际上这会很是有用,由于几乎全部的大数据组件都会用到相关的RPC框架,无论是开源三方的仍是其本身实现的(好比Hadoop的就是使用本身实现的一套RPC框架)。
Apache Thrift
Apache Thrift是业界流行的RPC框架,经过其提供的接口描述语言(IDL),能够快速构建用于数据通讯的而且语言无关的RPC客户端和服务端,在带来高性能的同时,大大下降了开发人员构建RPC服务的成本,所以在大数据生态其有较多的应用场景,好比咱们熟知的hiveserver2便是基于Apache Thrift来构建其RPC服务。
在看Kyuubi的源码时,咱们能够把较多精力放在某几种较重要的类和其体系上,这样有助于咱们抓住重点,理解Kyuubi最核心的部分。仅考虑Kyuubi总体的架构设计和实现,比较重要的是Service、Session和Operation等相关的类和体系。
Service体系
Service,顾名思义就是服务,在Kyuubi中,各类不一样核心功能的提供都是经过其Service体系下各个实现类来进行提供的。咱们前面提到的服务发现层、Kyuubi Server层和Kyuubi Engine层,在代码实现上绝大部分核心功能都是由Kyuubi源码项目的Server类体系来完成的,能够这么说,理解了Service体系涉及类的相关功能,就基本上从源码级别上理解了整个Kyuubi的体系架构设计和实现。
固然这些Service的实现类并不必定使用Service结尾,好比SessionManager、OperationManager等,但基本上从名字咱们就能对其功能窥探一二。
其完整的继承关系以下:
基于Kyuubi提供的核心功能,咱们能够大体按Kyuubi Server层和Kyuubi Engine层来将整个体系中的Service类进行一个划分:
openSession
、executeStatement
、fetchResults
等;openSession
、executeStatement
、fetchResults
等;这里咱们只对具体实现类进行归类,由于中间抽象类只是提取多个子类的公共方法,不影响咱们对其体系功能的说明和讲解;而以Noop开头的其实是Kyuubi的测试实现类,所以咱们也不展开说明;KinitAuxiliaryService
是Kyuubi中用于认证的类,这里咱们不对其认证功能实现进行说明。
经过对Service体系各个具体实现类的介绍,再回顾前面对Kyuubi总体架构和协做流程的介绍,其抽象的功能在源码实现类上面就有了一个相对比较清晰的体现,而且基本上也是能够一一对应上的。
Service组合关系
为了理解Kyuubi在源码层面上是如何进行总体协做的,除了前面介绍的Service体系外,咱们还有必要理清其各个Service之间的组合关系。
在整个Service体系中,CompositeService
这个中间抽象类在设计上是须要额外关注的,它表示的是在它之下的实现类都至少有一个成员为其它Service服务类对象,好比对于KyuubiServer
,它的成员则包含有KyuubiBackdService
、KyuubiServiceDiscovery
等多个Service实现类,SparkSQLEngine
也是如此。
咱们将一些关键的Service类及其组合关系梳理以下,这对后面咱们分析关键场景的代码执行流程时会提供很清晰的思路参考:
Session与SessionHandle
当咱们使用经过JDBC或beeline的方式链接Kyuubi时,实际上在Kyuubi内部就为咱们建立了一个Session,用以标识本次会话的全部相关信息,后续的全部操做都是基于此次会话来完成的,咱们能够在一次会话下执行多个操做(好比屡次执行某次SQL,咱们只须要创建一次会话链接便可)。
Session在Kyuubi中又分为Kyuubi Server层的Session和Kyuubi Engine层的Session。Kyuubi Server层的Session实现类为KyuubiSessionImpl
,用来标识来自用户层的会话链接信息;Kyuubi Engine层的Session实现类为SparkSessionImpl
,用来标识来自Kyuubi Server层的会话链接信息。两个Session实现类都有一个共同的抽象父类AbstractSession
,用于Session操做的主要功能逻辑都是在该类实现的。
Session对象的存储实际上由SessionManager来完成,在SessionManager内部其经过一个Map来存储Session的详细信息,其中key为SessionHandle,value为Session对象自己。SessionHandle能够理解为就是封装了一个惟一标识一个用户会话的字符串,这样用户在会话创建后进行通讯时只须要携带该字符串标识便可,并不须要传输完整的会话信息,以免网络传输带来的开销。
Operation与OperationHandle
用户在创建会话后执行的相关语句在Kyuubi内部都会抽象为一个个的Operation,好比执行一条SQL语句对应的Operation实现类为Executement
,不过须要注意,Operation又分为Kyuubi Server层的KyuubiOperation
和Kyuubi Engine层的SparkOperation
。Kyuubi Server层的Operation并不会执行真正的操做,它只是一个代理,它会经过RPC Client请求Kyuubi Engine层来执行该Operation,所以全部Operation的真正执行都是在Kyuubi Engine层来完成的。
因为Operation都是创建在Session之下的,因此咱们在看前面的组合关系时能够看到,用于管理Operation的OperationManager为SessionManager的成员属性。
Operation对象的存储实际上由OprationManager来完成,在SessioOprationManagerManager内部其经过一个Map来存储Session的详细信息,其中key为OperationHandle,value为Operation对象自己。OperationHandle能够理解为就是封装了一个惟一标识一个用户操做的字符串,这样用户基于会话的操做时只须要携带该字符串标识便可,并不须要传输完整的操做信息,以免网络传输带来的开销。
第一次提交Operation时仍是须要完整信息,后续只须要提供OperationHandle便可,实际上SQL语句的执行在Kyuubi内部是异步执行的,用户端在提交Opeation后便可得到OperationHandle,后续只须要持着该OperationHandle去获取结果便可,咱们在分析SQL执行的代码时就能够看到这一点。
Kyuubi的启动实际上包含两部分,分别是KyuubiServer的启动和SparkSQLEngine的启动。KyuubiServer实例的启动发生在系统管理员根据实际业务须要启动KyuubiServer实例,这个是手动操做完成的;而SparkSQLEngine实例的启动则是在为用户创建会话时为由KyuubiServer实例经过spark-submit的方式去提交一个Spark应用来完成的。
KyuubiServer启动流程
当咱们在Kyuubi的bin目录下去执行./kyuubi run
命令去启动KyuubiServer时,就会去执行KyuubiServer的main方法:
def main(args: Array[String]): Unit = { info( """ | Welcome to | __ __ __ | /\ \/\ \ /\ \ __ | \ \ \/'/' __ __ __ __ __ __\ \ \____/\_\ | \ \ , < /\ \/\ \/\ \/\ \/\ \/\ \\ \ '__`\/\ \ | \ \ \\`\\ \ \_\ \ \ \_\ \ \ \_\ \\ \ \L\ \ \ \ | \ \_\ \_\/`____ \ \____/\ \____/ \ \_,__/\ \_\ | \/_/\/_/`/___/> \/___/ \/___/ \/___/ \/_/ | /\___/ | \/__/ """.stripMargin) info(s"Version: $KYUUBI_VERSION, Revision: $REVISION, Branch: $BRANCH," + s" Java: $JAVA_COMPILE_VERSION, Scala: $SCALA_COMPILE_VERSION," + s" Spark: $SPARK_COMPILE_VERSION, Hadoop: $HADOOP_COMPILE_VERSION," + s" Hive: $HIVE_COMPILE_VERSION") info(s"Using Scala ${Properties.versionString}, ${Properties.javaVmName}," + s" ${Properties.javaVersion}") SignalRegister.registerLogger(logger) val conf = new KyuubiConf().loadFileDefaults() UserGroupInformation.setConfiguration(KyuubiHadoopUtils.newHadoopConf(conf)) startServer(conf) }
在加载完配置信息后,经过调用startServer(conf)
方法,就开始了KyuubiServer的启动流程:
def startServer(conf: KyuubiConf): KyuubiServer = { if (!ServiceDiscovery.supportServiceDiscovery(conf)) { zkServer.initialize(conf) zkServer.start() conf.set(HA_ZK_QUORUM, zkServer.getConnectString) conf.set(HA_ZK_ACL_ENABLED, false) } val server = new KyuubiServer() server.initialize(conf) server.start() sys.addShutdownHook(server.stop()) server }
能够看到,实际上KyuubiServer的启动包括两部分:初始化和启动。
KyuubiServer的初始化和启动其实是一个递归初始化和启动的过程。咱们前面提到,KyuubiServer为Service体系下的一个CompositeService
,参考前面给出的组合关系图,它自己的成员又包含了多个Service对象,它们都保存在保存在serviceList
这个成员当中,所以初始化和启动KyuubiServer实际上就是初始化和启动serviceList
中所包含的各个Service对象。而这些Service对象自己又多是CompositeService
,所以KyuubiServer的启动和初始化实际上就是一个递归初始化和启动的过程。
// 递归初始化serviceList下的各个服务 override def initialize(conf: KyuubiConf): Unit = { serviceList.foreach(_.initialize(conf)) super.initialize(conf) } // 递归启动serviceList下的各个服务 override def start(): Unit = { serviceList.zipWithIndex.foreach { case (service, idx) => try { service.start() } catch { case NonFatal(e) => error(s"Error starting service ${service.getName}", e) stop(idx) throw new KyuubiException(s"Failed to Start $getName", e) } } super.start() }
这样一来,整个KyuubiServer的启动流程就比较清晰了,这也是咱们在最开始就列出其Service体系和组合关系的缘由,因为总体的启动流程和细节所包含的代码比较多,咱们就没有必要贴代码了,这里我把整个初始化和启动流程步骤的流程图梳理了出来,待会再对其中一些须要重点关注的点进行说明,以下:
咱们重点关注一下FontendService
和ServiceDiscoveryService
的初始化和启动流程。
咱们须要重点关注一下FrontendService
,由于KyuubiServer实例对外提供RPC服务都是由其做为入口来完成的。
其初始化时主要是获取和设置了Apache Thrift内置的用于构建RPC服务端的TThreadPoolServer
的相关参数:
override def initialize(conf: KyuubiConf): Unit = synchronized { this.conf = conf try { hadoopConf = KyuubiHadoopUtils.newHadoopConf(conf) val serverHost = conf.get(FRONTEND_BIND_HOST) serverAddr = serverHost.map(InetAddress.getByName).getOrElse(InetAddress.getLocalHost) portNum = conf.get(FRONTEND_BIND_PORT) val minThreads = conf.get(FRONTEND_MIN_WORKER_THREADS) val maxThreads = conf.get(FRONTEND_MAX_WORKER_THREADS) val keepAliveTime = conf.get(FRONTEND_WORKER_KEEPALIVE_TIME) val executor = ExecutorPoolCaptureOom( name + "Handler-Pool", minThreads, maxThreads, keepAliveTime, oomHook) authFactory = new KyuubiAuthenticationFactory(conf) val transFactory = authFactory.getTTransportFactory val tProcFactory = authFactory.getTProcessorFactory(this) val serverSocket = new ServerSocket(portNum, -1, serverAddr) portNum = serverSocket.getLocalPort val tServerSocket = new TServerSocket(serverSocket) val maxMessageSize = conf.get(FRONTEND_MAX_MESSAGE_SIZE) val requestTimeout = conf.get(FRONTEND_LOGIN_TIMEOUT).toInt val beBackoffSlotLength = conf.get(FRONTEND_LOGIN_BACKOFF_SLOT_LENGTH).toInt val args = new TThreadPoolServer.Args(tServerSocket) .processorFactory(tProcFactory) .transportFactory(transFactory) .protocolFactory(new TBinaryProtocol.Factory) .inputProtocolFactory( new TBinaryProtocol.Factory(true, true, maxMessageSize, maxMessageSize)) .requestTimeout(requestTimeout).requestTimeoutUnit(TimeUnit.SECONDS) .beBackoffSlotLength(beBackoffSlotLength) .beBackoffSlotLengthUnit(TimeUnit.MILLISECONDS) .executorService(executor) // TCP Server server = Some(new TThreadPoolServer(args)) server.foreach(_.setServerEventHandler(new FeTServerEventHandler)) info(s"Initializing $name on host ${serverAddr.getCanonicalHostName} at port $portNum with" + s" [$minThreads, $maxThreads] worker threads") } catch { case e: Throwable => throw new KyuubiException( s"Failed to initialize frontend service on $serverAddr:$portNum.", e) } super.initialize(conf) }
能够看到主要是host、port、minThreads、maxThreads、maxMessageSize、requestTimeout等,这些参数都是可配置的,关于其详细做用能够参考KyuubiConf
这个类的说明。
其启动比较简单,主要是调用TThreadPoolServer
的server()
方法来完成:
override def start(): Unit = synchronized { super.start() if(!isStarted) { serverThread = new NamedThreadFactory(getName, false).newThread(this) serverThread.start() isStarted = true } } override def run(): Unit = try { info(s"Starting and exposing JDBC connection at: jdbc:hive2://$connectionUrl/") server.foreach(_.serve()) } catch { case _: InterruptedException => error(s"$getName is interrupted") case t: Throwable => error(s"Error starting $getName", t) System.exit(-1) }
初始化时主要是建立一个用于后续链接ZooKeeper的zkClient:
def namespace: String = _namespace override def initialize(conf: KyuubiConf): Unit = { this.conf = conf _namespace = conf.get(HA_ZK_NAMESPACE) val maxSleepTime = conf.get(HA_ZK_CONN_MAX_RETRY_WAIT) val maxRetries = conf.get(HA_ZK_CONN_MAX_RETRIES) setUpZooKeeperAuth(conf) _zkClient = buildZookeeperClient(conf) zkClient.getConnectionStateListenable.addListener(new ConnectionStateListener { private val isConnected = new AtomicBoolean(false) override def stateChanged(client: CuratorFramework, newState: ConnectionState): Unit = { info(s"Zookeeper client connection state changed to: $newState") newState match { case CONNECTED | RECONNECTED => isConnected.set(true) case LOST => isConnected.set(false) val delay = maxRetries.toLong * maxSleepTime connectionChecker.schedule(new Runnable { override def run(): Unit = if (!isConnected.get()) { error(s"Zookeeper client connection state changed to: $newState, but failed to" + s" reconnect in ${delay / 1000} seconds. Give up retry. ") stopGracefully() } }, delay, TimeUnit.MILLISECONDS) case _ => } } }) zkClient.start() super.initialize(conf) }
固然这里还看到其获取了一个HA_ZK_NAMESPACE
的配置值,其默认值为kyuubi
:
val HA_ZK_NAMESPACE: ConfigEntry[String] = buildConf("ha.zookeeper.namespace") .doc("The root directory for the service to deploy its instance uri. Additionally, it will" + " creates a -[username] suffixed root directory for each application") .version("1.0.0") .stringConf .createWithDefault("kyuubi")
在ServiceDiscoveryService进行启动的时候,就会基于该namesapce来构建在Kyuubi Server层进行服务发现所须要的KyuubiServer实例信息:
override def start(): Unit = { val ns = ZKPaths.makePath(null, namespace) try { zkClient .create() .creatingParentsIfNeeded() .withMode(PERSISTENT) .forPath(ns) } catch { case _: NodeExistsException => // do nothing case e: KeeperException => throw new KyuubiException(s"Failed to create namespace '$ns'", e) } val instance = server.connectionUrl val pathPrefix = ZKPaths.makePath( namespace, s"serviceUri=$instance;version=$KYUUBI_VERSION;sequence=") try { _serviceNode = new PersistentEphemeralNode( zkClient, PersistentEphemeralNode.Mode.EPHEMERAL_SEQUENTIAL, pathPrefix, instance.getBytes(StandardCharsets.UTF_8)) serviceNode.start() val znodeTimeout = 120 if (!serviceNode.waitForInitialCreate(znodeTimeout, TimeUnit.SECONDS)) { throw new KyuubiException(s"Max znode creation wait time $znodeTimeout s exhausted") } info(s"Created a ${serviceNode.getActualPath} on ZooKeeper for KyuubiServer uri: " + instance) } catch { case e: Exception => if (serviceNode != null) { serviceNode.close() } throw new KyuubiException( s"Unable to create a znode for this server instance: $instance", e) } super.start() }
在这里,就会在Zookeeper的/kyuubi
节点下面建立一个包含KyuubiServer实例详细链接信息的节点,假设KyuubiServer实例所配置的host和post分别为10.2.10.1
和10009
,那么其所建立的zk节点为:
[zk: localhost:2181(CONNECTED) 87] ls /kyuubi [serviceUri=10.2.10.1:10009;version=1.1.0;sequence=0000000007]
咱们主要关注一下其启动过程:
// org.apache.kyuubi.session.SessionManager#start override def start(): Unit = { startTimeoutChecker() super.start() } // org.apache.kyuubi.session.SessionManager#startTimeoutChecker private def startTimeoutChecker(): Unit = { val interval = conf.get(SESSION_CHECK_INTERVAL) val timeout = conf.get(SESSION_TIMEOUT) val checkTask = new Runnable { override def run(): Unit = { val current = System.currentTimeMillis if (!shutdown) { for (session <- handleToSession.values().asScala) { if (session.lastAccessTime + timeout <= current && session.getNoOperationTime > timeout) { try { closeSession(session.handle) } catch { case e: KyuubiSQLException => warn(s"Error closing idle session ${session.handle}", e) } } else { session.closeExpiredOperations } } } } } timeoutChecker.scheduleWithFixedDelay(checkTask, interval, interval, TimeUnit.MILLISECONDS) }
在这里主要完成的事情:
1.获取session check interval;
2.获取session timout;
3.起一个schedule的调度线程;
4.根据interval和timeout对handleToSession的session进行检查;
5.若是session超时(超过timeout没有access),则closesession;
那么对于KyuubiServer的启动过程咱们就分析到这里,更多细节部分你们能够结合个人流程图来自行阅读代码便可,实际上当咱们把Kyuubi的Service体系和组合关系整理下来以后,再去分析它的启动流程时就会发现简单不少,这个过程当中无非就是要关注它的一些相关参数获取和设置是在哪里完成的,它是怎么侦听服务的(真正用于侦听host和port的server的启动)。
SparkSQLEngine启动流程
在KyuubiServer为用户创建会话时会去经过服务发现层去Zookeeper查找该用户是否存在对应的SparkSQLEngine实例,若是没有则经过spark-submit的启动一个属于该用户的SparkSQLEngine实例。
后面在分析KyuubiServer Session创建过程会提到,实际上KyuubiServer是经过调用外部进程命令的方式来提交一个Spark应用的,为了方便分析SparkSQLEngine的启动流程,这里我先将其大体的命令贴出来:
/Users/xpleaf/app/kyuubi-1.1.0-bin-spark-3.0-hadoop2.7/externals/spark-3.0.2-bin-hadoop2.7/bin/spark-submit \ --class org.apache.kyuubi.engine.spark.SparkSQLEngine \ --conf spark.app.name=kyuubi_USER_xpleaf_2dd0b8a8-e8c3-4788-8586-387622630b73 \ --conf spark.hive.server2.thrift.resultset.default.fetch.size=1000 \ --conf spark.kyuubi.ha.zookeeper.namespace=/kyuubi_USER/xpleaf \ --conf spark.kyuubi.ha.zookeeper.quorum=127.0.0.1:2181 \ --conf spark.yarn.tags=KYUUBI \ --conf spark.kyuubi.ha.zookeeper.acl.enabled=false \ --proxy-user xpleaf /Users/xpleaf/app/kyuubi-1.1.0-bin-spark-3.0-hadoop2.7/externals/engines/spark/kyuubi-spark-sql-engine-1.1.0.jar
kyuubi-spark-sql-engine-1.1.0.jar
是Kyuubi发布版本里面的一个jar包,里面就包含了SparkSQLEngine
这个类,经过-class
参数咱们能够知道,实际上就是要运行SparkSQLEngine的main方法,因为开启了SparkSQLEngine的启动流程。
须要说明的是,提交Sparkk App的这些参数在SparkSQLEngine启动以前都会被设置到SparkSQLEngine的成员变量kyuubiConf
当中,获取方法比较简单,经过scala提供的sys.props
就能够获取,这些参数在SparkSQLEngine的初始化和启动中都会起到十分关键的做用。
接下来咱们看一下SparkSQLEngine的main方法:
def main(args: Array[String]): Unit = { SignalRegister.registerLogger(logger) var spark: SparkSession = null var engine: SparkSQLEngine = null try { spark = createSpark() engine = startEngine(spark) info(KyuubiSparkUtil.diagnostics(spark)) // blocking main thread countDownLatch.await() } catch { case t: Throwable => error("Error start SparkSQLEngine", t) if (engine != null) { engine.stop() } } finally { if (spark != null) { spark.stop() } } }
首先会经过createSpark()
建立一个SparkSession对象,后续SQL的真正执行都会交由其去执行,其建立方法以下:
def createSpark(): SparkSession = { val sparkConf = new SparkConf() sparkConf.setIfMissing("spark.sql.legacy.castComplexTypesToString.enabled", "true") sparkConf.setIfMissing("spark.master", "local") sparkConf.setIfMissing("spark.ui.port", "0") val appName = s"kyuubi_${user}_spark_${Instant.now}" sparkConf.setIfMissing("spark.app.name", appName) kyuubiConf.setIfMissing(KyuubiConf.FRONTEND_BIND_PORT, 0) kyuubiConf.setIfMissing(HA_ZK_CONN_RETRY_POLICY, RetryPolicies.N_TIME.toString) // Pass kyuubi config from spark with `spark.kyuubi` val sparkToKyuubiPrefix = "spark.kyuubi." sparkConf.getAllWithPrefix(sparkToKyuubiPrefix).foreach { case (k, v) => kyuubiConf.set(s"kyuubi.$k", v) } if (logger.isDebugEnabled) { kyuubiConf.getAll.foreach { case (k, v) => debug(s"KyuubiConf: $k = $v") } } val session = SparkSession.builder().config(sparkConf).enableHiveSupport().getOrCreate() session.sql("SHOW DATABASES") session }
这里主要是设置了一些在建立SparkSession时须要的参数,包括appName、spark运行方式、spark ui的端口等。另外这里还特别对frontend.bind.port
参数设置为0,关于该参数自己的定义以下:
val FRONTEND_BIND_PORT: ConfigEntry[Int] = buildConf("frontend.bind.port") .doc("Port of the machine on which to run the frontend service.") .version("1.0.0") .intConf .checkValue(p => p == 0 || (p > 1024 && p < 65535), "Invalid Port number") .createWithDefault(10009)
能够看到其默认值为10009,前面KyuubiServer在构建TThreadPoolServer
时就直接使用了默认值,这也是咱们启动的KyuubiServer实例侦听10009端口的缘由,而在这里,也就是SparkSQLEngine启动时将其设置为0是有缘由,咱们将在下面继续说明。
建立完成SparkSession后才调用startEngine(spark)
方法启动SparkSQLEngine自己:
def startEngine(spark: SparkSession): SparkSQLEngine = { val engine = new SparkSQLEngine(spark) engine.initialize(kyuubiConf) engine.start() sys.addShutdownHook(engine.stop()) currentEngine = Some(engine) engine }
能够看到也是先进行初始化,而后再启动,SparkSQLEngine自己是CompositeService
,因此初始化和启动过程跟KyuubiServer是如出一辙的(固然其包含的成员会有所差异),都是递归对serviceList
中所包含的各个Service对象进行初始化和启动:
FrontendService在SparkSQLEngine中的启动流程与在KyuubiServer中的启动流程是基本同样的,能够参考前面的说明,这里主要说明一些比较细微的差异点。
前面已经设置了frontend.bind.port
参数的值为0,在FrontendService这个类当中,它会赋值给portNum
这个变量,用以构建TThreadPoolServer
所须要的参数ServerSocket
对象:
// org.apache.kyuubi.service.FrontendService#initialize val serverSocket = new ServerSocket(portNum, -1, serverAddr)
因此实际上,无论是KyuubiServer仍是SparkSQLEngine,其所侦听的端口是在这里构建ServerSocket对象的时候肯定下来的,对ServerSocket对象,若是传入一个为0的portNum,则表示使用系统随机分配的端口号,因此这也就是咱们在启动了SparkSQLEngine以后看到其侦听的端口号都是随机端口号的缘由。
与KyuubiServer相似,这里分析一下其差异点。
前面在经过spark-submit提交应用时传入了--conf spark.kyuubi.ha.zookeeper.namespace=/kyuubi_USER/xpleaf
的参数,实际上在SparkSQLEngine初始化KyuubiConfig对象时会设置到KyuubiConfig.HA_ZK_NAMESPACE
属性上,所以在ServiceDiscoveryService初始化时获取的namespace实际上就为/kyuubi_USER/xpleaf
,而不是默认的kyuubi
,这点是须要注意的:
def namespace: String = _namespace override def initialize(conf: KyuubiConf): Unit = { this.conf = conf _namespace = conf.get(HA_ZK_NAMESPACE) // 省略其它代码 }
所以在启动调用start()方法时,其在Zookeeper上构建的znode节点也就不一样:
override def start(): Unit = { // 省略其它代码 val instance = server.connectionUrl val pathPrefix = ZKPaths.makePath( namespace, s"serviceUri=$instance;version=$KYUUBI_VERSION;sequence=") // 省略其它代码 }
好比其建立的znode节点为:
[zk: localhost:2181(CONNECTED) 94] ls /kyuubi_USER/xpleaf [serviceUri=10.2.10.1:52643;version=1.1.0;sequence=0000000004]
SparkSQLSessionManager也是继承自SessionManager,所以与KyuubiServer的KyuubiSessionManager同样,其也启动了一个用于检查Session是否超时的checker。
此外,还启动了另一个checker,以下:
// org.apache.kyuubi.engine.spark.SparkSQLEngine#start override def start(): Unit = { super.start() // Start engine self-terminating checker after all services are ready and it can be reached by // all servers in engine spaces. backendService.sessionManager.startTerminatingChecker() } // org.apache.kyuubi.session.SessionManager#startTerminatingChecker private[kyuubi] def startTerminatingChecker(): Unit = if (!isServer) { // initialize `_latestLogoutTime` at start _latestLogoutTime = System.currentTimeMillis() val interval = conf.get(ENGINE_CHECK_INTERVAL) val idleTimeout = conf.get(ENGINE_IDLE_TIMEOUT) val checkTask = new Runnable { override def run(): Unit = { if (!shutdown && System.currentTimeMillis() - latestLogoutTime > idleTimeout && getOpenSessionCount <= 0) { info(s"Idled for more than $idleTimeout ms, terminating") sys.exit(0) // Note:直接退出整个SparkSQLEngine,也就是App } } } timeoutChecker.scheduleWithFixedDelay(checkTask, interval, interval, TimeUnit.MILLISECONDS) }
实际上这个checker是在SparkSQLEngine递归初始化和启动其serviceList以前就已经启动,从它的实现当中咱们能够看到,当超过必定时时而且SparkSQLEngine维护的Session为0时,整个SparkSQLEngine实例就会退出,这样作的好处就是,若是一个用户的SparkSQLEngine实例长期没有被使用,咱们就能够将其占用的资源释放出来,达到节省资源的目的。
Kyuubi Session的创建实际上包含两部分,分别是KyuubiServer Session创建和SparkSQLEngine Session创建,这两个过程不是独立进行的,KyuubiServer Session的创建伴随着SparkSQLEngine Session的创建,KyuubiServer Session和SparkSQLEngine Session才完整构成了Kyuubi中可用于执行特定Operation操做的Session。
KyuubiServer Session创建过程
当用户经过JDBC或beeline的方式链接Kyuubi时,实际上就开启了KyuubiServer Session的一个创建过程,此时KyuubiServer中FrontedService的OpenSession
方法就会被执行:
// org.apache.kyuubi.service.FrontendService#OpenSession override def OpenSession(req: TOpenSessionReq): TOpenSessionResp = { debug(req.toString) info("Client protocol version: " + req.getClient_protocol) val resp = new TOpenSessionResp try { val sessionHandle = getSessionHandle(req, resp) resp.setSessionHandle(sessionHandle.toTSessionHandle) resp.setConfiguration(new java.util.HashMap[String, String]()) resp.setStatus(OK_STATUS) Option(CURRENT_SERVER_CONTEXT.get()).foreach(_.setSessionHandle(sessionHandle)) } catch { case e: Exception => warn("Error opening session: ", e) resp.setStatus(KyuubiSQLException.toTStatus(e, verbose = true)) } resp }
进而开启了KyuubiServer Session创建以及后续SparkSQLEngine实例启动(这部分前面已经单独介绍)、SparkSQLEngine Session创建的过程:
总体流程并不复杂,在执行FrontendService#OpenSession方法时,最终会调用到KyuubiSessionImpl#open方法,这是整个KyuubiServer Session创建最复杂也是最为关键的一个过程,为此咱们单独将其流程整理出来进行说明:
流程中其实已经能够比较清晰地说明其过程,这里咱们再详细展开说下,其主要分为下面的过程:
第一次创建特定user的session时,在zk的/kyuubi_USER path下是没有相关user的节点的,好比/kyuubi_USER/xpleaf,所以在代码执行流程中,其获取的值会为None,这就触发了其调用外部命令来启动一个SparkSQLEngine实例:
// org.apache.kyuubi.session.KyuubiSessionImpl#open override def open(): Unit = { super.open() val zkClient = startZookeeperClient(sessionConf) logSessionInfo(s"Connected to Zookeeper") try { getServerHost(zkClient, appZkNamespace) match { case Some((host, port)) => openSession(host, port) case None => sessionConf.setIfMissing(SparkProcessBuilder.APP_KEY, boundAppName.toString) // tag is a seq type with comma-separated sessionConf.set(SparkProcessBuilder.TAG_KEY, sessionConf.getOption(SparkProcessBuilder.TAG_KEY) .map(_ + ",").getOrElse("") + "KYUUBI") sessionConf.set(HA_ZK_NAMESPACE, appZkNamespace) val builder = new SparkProcessBuilder(appUser, sessionConf) try { logSessionInfo(s"Launching SQL engine:\n$builder") val process = builder.start var sh = getServerHost(zkClient, appZkNamespace) val started = System.currentTimeMillis() var exitValue: Option[Int] = None while (sh.isEmpty) { if (exitValue.isEmpty && process.waitFor(1, TimeUnit.SECONDS)) { exitValue = Some(process.exitValue()) if (exitValue.get != 0) { throw builder.getError } } if (started + timeout <= System.currentTimeMillis()) { process.destroyForcibly() throw KyuubiSQLException(s"Timed out($timeout ms) to launched Spark with $builder", builder.getError) } sh = getServerHost(zkClient, appZkNamespace) } val Some((host, port)) = sh openSession(host, port) } finally { // we must close the process builder whether session open is success or failure since // we have a log capture thread in process builder. builder.close() } } } finally { try { zkClient.close() } catch { case e: IOException => error("Failed to release the zkClient after session established", e) } } }
而调用的外部命令实际上就是咱们在前面讲解SparkSQLEngine实例中提到的spark-submit命令:
/Users/xpleaf/app/kyuubi-1.1.0-bin-spark-3.0-hadoop2.7/externals/spark-3.0.2-bin-hadoop2.7/bin/spark-submit \ --class org.apache.kyuubi.engine.spark.SparkSQLEngine \ --conf spark.app.name=kyuubi_USER_xpleaf_2dd0b8a8-e8c3-4788-8586-387622630b73 \ --conf spark.hive.server2.thrift.resultset.default.fetch.size=1000 \ --conf spark.kyuubi.ha.zookeeper.namespace=/kyuubi_USER/xpleaf \ --conf spark.kyuubi.ha.zookeeper.quorum=127.0.0.1:2181 \ --conf spark.yarn.tags=KYUUBI \ --conf spark.kyuubi.ha.zookeeper.acl.enabled=false \ --proxy-user xpleaf /Users/xpleaf/app/kyuubi-1.1.0-bin-spark-3.0-hadoop2.7/externals/engines/spark/kyuubi-spark-sql-engine-1.1.0.jar
以后就是SparkSQLEngine实例的启动过程,其启动完成以后,就会在Zookeeper上面注册本身的节点信息。
对于KyuubiSessionImpl#open方法,在不超时的状况下,循环会一直执行,直到其获取到用户的SparkSQLEngine实例信息,循环结束,进入下面跟SparkSQLEngine实例创建会话的过程。
SparkSQLEngine本质上也是一个RPC服务端,为了与其进行通讯以创建会话,就须要构建RPC客户端,这里KyuubiSessionImpl#openSession方法中构建RPC客户端的方法主要是Apache Thrift的一些模板代码,以下:
org.apache.kyuubi.session.KyuubiSessionImpl#openSession private def openSession(host: String, port: Int): Unit = { val passwd = Option(password).filter(_.nonEmpty).getOrElse("anonymous") val loginTimeout = sessionConf.get(ENGINE_LOGIN_TIMEOUT).toInt transport = PlainSASLHelper.getPlainTransport( user, passwd, new TSocket(host, port, loginTimeout)) if (!transport.isOpen) { logSessionInfo(s"Connecting to engine [$host:$port]") transport.open() logSessionInfo(s"Connected to engine [$host:$port]") } client = new TCLIService.Client(new TBinaryProtocol(transport)) val req = new TOpenSessionReq() req.setUsername(user) req.setPassword(passwd) req.setConfiguration(conf.asJava) logSessionInfo(s"Sending TOpenSessionReq to engine [$host:$port]") val resp = client.OpenSession(req) logSessionInfo(s"Received TOpenSessionResp from engine [$host:$port]") ThriftUtils.verifyTStatus(resp.getStatus) remoteSessionHandle = resp.getSessionHandle sessionManager.operationManager.setConnection(handle, client, remoteSessionHandle) }
在发送请求给SparkSQLEngine的时候,又会触发SparkSQLEngine Session创建的过程(这个接下来讲明),在跟其创建完Session以后,KyuubiSessionImpl会将其用于标识用户端会话的sessionHandle、用于跟SparkSQLEngine进行通讯的RPC客户端和在SparkSQLEngine实例中进行Session标识的remoteSessionHandle缓存下来,这样在整个Kyuubi体系中,就构建了一个完整的Session映射关系:userSessionInKyuubiServer-RPCClient-KyuubiServerSessionInSparkSQLEngine,后续的Operation都是创建在这样一个体系之下。
KyuubiServer在Session创建完成后会给客户端返回一个SessionHandle,后续客户端在与KyuubiServer进行通讯时都会携带该SessionHandle,以标识其用于会话的窗口。
SparkSQLEngine Session创建过程
在接收到来自KyuubiServer的创建会话的RPC请求以后,SparkSQLEngine中FrontedService的OpenSession
方法就会被执行,其总体流程与KyuubiServer Session的创建过程是相似的,主要不一样在于SparkSQLSessionManager#openSession方法执行上面,以下:
其对应的关键代码以下:
// org.apache.kyuubi.engine.spark.session.SparkSQLSessionManager#openSession override def openSession( protocol: TProtocolVersion, user: String, password: String, ipAddress: String, conf: Map[String, String]): SessionHandle = { info(s"Opening session for $user@$ipAddress") val sessionImpl = new SparkSessionImpl(protocol, user, password, ipAddress, conf, this) val handle = sessionImpl.handle try { val sparkSession = spark.newSession() // 省略非核心代码 sessionImpl.open() operationManager.setSparkSession(handle, sparkSession) setSession(handle, sessionImpl) info(s"$user's session with $handle is opened, current opening sessions" + s" $getOpenSessionCount") handle } catch { case e: Exception => sessionImpl.close() throw KyuubiSQLException(e) } }
sessionImpl.open()
实际上只是作了日志记录的一些操做,因此其实这里的核心是将建立的Session记录下来。
SparkSQLEngine在Session创建完成后会给KyuubiServer返回一个SessionHandle,后续KyuubiServer在与SparkSQLEngine进行通讯时都会携带该SessionHandle,以标识其用于会话的窗口。
Kyuubi SQL的执行流程实际上包含两部分,分别是KyuubiServer SQL执行流程和SparkSQLEngine SQL执行流程,其结合起来才是一个完整的SQL执行流程,KyuubiServer只是一个代理,真正的SQL执行是在SparkSQLEngine中完成。
另外因为在Kyuubi中,SQL的执行是异步的,也就是能够先提交一个SQL让其去执行,后续再经过其返回的operationHandle去获取结果,因此在KyuubiServer和SparkSQLEngine内部,SQL的执行流程又能够再细分为提交Statement和FetchResults两个过程,在分别分析KyuubiServer SQL执行流程和SparkSQLEngine SQL执行流程时,咱们就是对提交Statment和FetchResults这两个过程来展开详细的分析,总体会有些繁多,但并不复杂。
KyuubiServer SQL执行流程
当用户经过JDBC或beeline的方式执行一条SQL语句时,就开启了SQL语句在Kyuubi中的执行流程,此时KyuubiServer中FrontedService的ExecuteStatement
方法就会被执行:
override def ExecuteStatement(req: TExecuteStatementReq): TExecuteStatementResp = { debug(req.toString) val resp = new TExecuteStatementResp try { val sessionHandle = SessionHandle(req.getSessionHandle) val statement = req.getStatement val runAsync = req.isRunAsync // val confOverlay = req.getConfOverlay val queryTimeout = req.getQueryTimeout val operationHandle = if (runAsync) { be.executeStatementAsync(sessionHandle, statement, queryTimeout) } else { be.executeStatement(sessionHandle, statement, queryTimeout) } resp.setOperationHandle(operationHandle.toTOperationHandle) resp.setStatus(OK_STATUS) } catch { case e: Exception => warn("Error executing statement: ", e) resp.setStatus(KyuubiSQLException.toTStatus(e)) } resp }
runAsync
值为true,所以会经过异步的方式来执行SQL,也就是会执行BackendService的executeStatementAsync方法,开启了异步执行SQL的流程:
首先会经过KyuubiOperationManager去建立一个表示执行SQL的ExecuteStatement:
// org.apache.kyuubi.operation.KyuubiOperationManager#newExecuteStatementOperation override def newExecuteStatementOperation( session: Session, statement: String, runAsync: Boolean, queryTimeout: Long): Operation = { val client = getThriftClient(session.handle) val remoteSessionHandle = getRemoteTSessionHandle(session.handle) val operation = new ExecuteStatement(session, client, remoteSessionHandle, statement, runAsync) addOperation(operation) }
client实际上就是咱们前面在KyuubiServer Session创建过程当中创建的用于与SparkSQLEngine通讯的RPC客户端,ExecuteStatement须要client来发送执行SQL语句的请求给SparkSQLEngine实例,不过须要注意的是,这里的ExecuteStatement是KyuubiServer体系下的,其类全路径为org.apache.kyuubi.operation.ExecuteStatement
,由于后面在分析SparkSQLEngine SQL执行流程时,在SparkSQLEngine体系下也有一个ExecuteStatement,但其类全路径为org.apache.kyuubi.engine.spark.operation.ExecuteStatement
。
这里的整个流程关键在于后面执行operation.run()
方法,进而执行runInternal()
方法:
// org.apache.kyuubi.operation.ExecuteStatement#runInternal override protected def runInternal(): Unit = { if (shouldRunAsync) { executeStatement() val sessionManager = session.sessionManager val asyncOperation = new Runnable { override def run(): Unit = waitStatementComplete() } try { val backgroundOperation = sessionManager.submitBackgroundOperation(asyncOperation) setBackgroundHandle(backgroundOperation) } catch onError("submitting query in background, query rejected") } else { setState(OperationState.RUNNING) executeStatement() setState(OperationState.FINISHED) } }
这里会经过异步的方式来执行,其先同步执行executeStatement()方法,而后再提交一个异步线程来执行asyncOperation(sessionManager.submitBackgroundOperation(asyncOperation)
实际上就是经过线程池来提交一个线程线程),咱们先看一下其executeStatement()方法:
// org.apache.kyuubi.operation.ExecuteStatement#executeStatement private def executeStatement(): Unit = { try { val req = new TExecuteStatementReq(remoteSessionHandle, statement) req.setRunAsync(shouldRunAsync) val resp = client.ExecuteStatement(req) verifyTStatus(resp.getStatus) _remoteOpHandle = resp.getOperationHandle } catch onError() }
这里statement实际上就是要执行的SQL语句,因此本质上就是向SparkSQLEngine发送了一个用于执行SQL语句的RPC请求,这样就会触发SparkSQLEngine执行提交Statement的一个过程(这个接下来会分析),请求成功后,KyuubiServer会将SparkSQLEngine实例用于记录该操做的operationHandle记录下来,就是赋值给成员变量_remoteOpHandle
,_remoteOpHandle
用后续用于查询statement在SparkSQLEngine实例中的执行状态和FetchResults。
执行完executeStatement()方法后,咱们再看一下其提交异步线程时所执行的操做,也就是waitStatementComplete()方法:
// org.apache.kyuubi.operation.ExecuteStatement#waitStatementComplete // TODO 主要是更新该Operation的State为FINISHED,这样后面取数据时才知道已经执行完成 private lazy val statusReq = new TGetOperationStatusReq(_remoteOpHandle) private def waitStatementComplete(): Unit = { setState(OperationState.RUNNING) // 由于FetchResults有进行检查,assertState(OperationState.FINISHED) var statusResp = client.GetOperationStatus(statusReq) var isComplete = false while (!isComplete) { getQueryLog() verifyTStatus(statusResp.getStatus) val remoteState = statusResp.getOperationState info(s"Query[$statementId] in ${remoteState.name()}") isComplete = true remoteState match { case INITIALIZED_STATE | PENDING_STATE | RUNNING_STATE => isComplete = false statusResp = client.GetOperationStatus(statusReq) case FINISHED_STATE => setState(OperationState.FINISHED) // 省略其它代码 setOperationException(ke) } } // see if anymore log could be fetched getQueryLog() }
能够看到其主要操做是构建用于查询SparkSQLEngine实例中Operation的执行状态。
再回过来看一下runInternal()
方法:
// org.apache.kyuubi.operation.ExecuteStatement#runInternal override protected def runInternal(): Unit = { if (shouldRunAsync) { executeStatement() val sessionManager = session.sessionManager val asyncOperation = new Runnable { override def run(): Unit = waitStatementComplete() } try { val backgroundOperation = sessionManager.submitBackgroundOperation(asyncOperation) setBackgroundHandle(backgroundOperation) } catch onError("submitting query in background, query rejected") } else { // 省略其它代码 } }
这里提交一个线程后的返回结果backgroundOperation实际上为一个FutureTask对象,后续在FetchResults过程当中经过该对象就能够知道Operation在SparkSQLEngine实例中的执行状态。
在提交完Statement以后,KyuubiServer会将operationHandle返回给用户端,用于后续获取执行结果。
提交完Statement后,用户层的RPC客户端就会去获取结果,此时KyuubiServer中FrontedService的FetchResults
方法就会被执行:
// org.apache.kyuubi.service.FrontendService#FetchResults override def FetchResults(req: TFetchResultsReq): TFetchResultsResp = { debug(req.toString) val resp = new TFetchResultsResp try { val operationHandle = OperationHandle(req.getOperationHandle) val orientation = FetchOrientation.getFetchOrientation(req.getOrientation) // 1 means fetching log val fetchLog = req.getFetchType == 1 val maxRows = req.getMaxRows.toInt val rowSet = be.fetchResults(operationHandle, orientation, maxRows, fetchLog) resp.setResults(rowSet) resp.setHasMoreRows(false) resp.setStatus(OK_STATUS) } catch { case e: Exception => warn("Error fetching results: ", e) resp.setStatus(KyuubiSQLException.toTStatus(e)) } resp }
在获取真正执行结果以前,会有屡次获取操做日志的请求,也就是req.getFetchType == 1
的状况,这里咱们只关注fetchLog
为false的状况:
获取执行结果的过程就比较简单,主要是调用RPC客户端的FetchResults方法,这样就会触发SparkSQLEngine FetchResults的一个过程(这个接下来会分析),不过在获取执行结果前会检查其执行状态,前面在分析在提交Statement时,异步线程waitStatementComplete()就会请求SparkSQLEngine更新其状态为FINISHED,所以这里能够正常获取执行结果。
SparkSQLEngine SQL执行流程
接收到KyuubiServer提交Statement的RPC请求时,此时SparkSQLEngine中FrontedService的ExecuteStatement
方法就会被执行,进而触发接下来提交Statement的整个流程:
其总体流程与KyuubiServer是十分类似的,主要区别在于:
1.其建立的Statement为SparkSQLEngine体系下的ExecuteStatement;
2.其异步线程是经过SparkSession来执行SQL语句;
所以咱们来看一下其runInternal()
方法和异步线程执行的executeStatement()
方法:
// org.apache.kyuubi.engine.spark.operation.ExecuteStatement#runInternal override protected def runInternal(): Unit = { if (shouldRunAsync) { val asyncOperation = new Runnable { override def run(): Unit = { OperationLog.setCurrentOperationLog(operationLog) executeStatement() } } try { val sparkSQLSessionManager = session.sessionManager val backgroundHandle = sparkSQLSessionManager.submitBackgroundOperation(asyncOperation) setBackgroundHandle(backgroundHandle) } catch { case rejected: RejectedExecutionException => setState(OperationState.ERROR) val ke = KyuubiSQLException("Error submitting query in background, query rejected", rejected) setOperationException(ke) throw ke } } else { executeStatement() } } // org.apache.kyuubi.engine.spark.operation.ExecuteStatement#executeStatement private def executeStatement(): Unit = { try { setState(OperationState.RUNNING) info(KyuubiSparkUtil.diagnostics(spark)) Thread.currentThread().setContextClassLoader(spark.sharedState.jarClassLoader) spark.sparkContext.setJobGroup(statementId, statement) result = spark.sql(statement) debug(result.queryExecution) iter = new ArrayFetchIterator(result.collect()) setState(OperationState.FINISHED) } catch { onError(cancel = true) } finally { spark.sparkContext.clearJobGroup() } }
能够看到其执行很是简单,就是直接调用SparkSession的sql()方法来执行SQL语句,最后再将结果保存到迭代器iter,并设置执行状态为完成。
在提交完Statement以后,SparkSQLEngine会将operationHandle返回给KyuubiServer,用于后续获取执行结果。
接收到KyuubiServer获取结果的RPC请求时,此时SparkSQLEngine中FrontedService的FetchResults
方法就会被执行,进而触发接下来FetchResults的整个流程:
整个过程比较简单,就是将iter的结果转换为rowSet的对象格式,最后返回给KyuubiServer。