Linkis JDBC模块设计介绍

目录
javascript

1、 背景介绍
2、 使用介绍
(1)引入依赖模块
(2)创建测试类
3、 JDBC模块设计方案
(1)驱动类UJESSQLDriver
(2)JDBC链接器UJESSQLConnection
3)执行对象UJESSQLStatement/UJESSQLPreStatement
(4)结果集UJESSQLResultSet
(5)错误码方案
4、 实现方案总结
5、 参考文献

相关文章分享: Linkis JDBC是如何适配Tableau的?
 

01html

前端

背景介绍java


Linkis做为大数据平台中间件,链接了底层的计算存储和上层的开发应用,统一了任务的调度和执行,在JDBC模块开发出来以前,向Linkis提交SQL执行任务到Spark、Hive执行支持websocket和restful的方式。为了多样化与Linkis的交互方式,便捷用户开发流程,轻量化客户端的任务提交过程,JDBC的支持无疑是很是值得考虑的。

JDBC(Java Data Base Connectivity, java数据库链接)是一种用于执行SQL语句的Java API,能够为多种关系数据库提供统一访问,它由一组用Java语言编写的类和接口组成。JDBC提供了一种基准,据此能够构建更高级的工具和接口,使数据库开发人员可以编写数据库应用程序。JDBC提供的主要功能是:一、同一个数据库创建链接;二、向数据库发送SQL语句;三、处理数据库返回的结果。

在Linkis中,JDBC模块属于Linkis-UJES下面的一个子模块,UJES即Unified Job Execution Service(统一做业执行服务),UJES是Linkis最第一版本的雏形,提供了基础的任务提交和结果集查询的对外服务,查询时JDBC向Linkis-UJES客户端提交SQL执行,返回获得结果集,用户在只须要像使用mysql的JDBC那样操做,大大下降了学习成本,而实现这些功能仅仅须要用户引入一个Linkis JDBC的JAR包便可。

须要区分的是,Linkis还提供JDBC引擎,例如在DataSphereStudio中能够建立JDBC脚本经过JDBC引擎提交任务到Linkis,但该引擎仅模拟实现了JDBC的部分方法,并无规范性的实现sun提供的JDBC 4.0的完整接口方案,没法向外提供规范的SDK服务。而本文所指的Linkis JDBC模块是实现了一套接口的完整方案。

 

02mysql

web

使用介绍sql


(1)引入依赖模块

第一种方式在pom里面依赖JDBC模块:
  
  
   
   
            
   
   
    
    
     
     
              
     
     

<dependency> <groupId>com.webank.wedatasphere.linkis</groupId> <artifactId>linkis-ujes-jdbc</artifactId> <version>0.9.1</version> </dependency>
  
  
   
   
            
   
   
注意若是引入不到该jar包,须要在ujes/jdbc目录里面执行mvn install -Dmaven.test.skip=true进行本地安装

第二种方式经过打包和编译

Step1:在Linkis项目中进入到ujes/jdbc目录而后在终端输入指令进行打包mvn assembly:assembly -Dmaven.test.skip=true 该打包指令会跳过单元测试的运行和测试代码的编译,并将JDBC模块须要的依赖一并打包进Jar包之中。
Step2:打包完成后在JDBC的target目录下会生成两个Jar包,Jar包名称中包含dependencies字样的那个就是咱们须要的Jar包

(2)创建测试类

创建Java的测试类UJESClientImplTestJ,具体接口含义能够见注释:
   

public static void main(String[] args) throws SQLException, ClassNotFoundException { //1. 加载驱动类: Class.forName("com.webank.wedatasphere.linkis.ujes.jdbc.UJESSQLDriver"); //2. 得到链接:jdbc:linkis://gatewayIP:gatewayPort 账号和密码对应前端的账号密码 Connection connection = DriverManager.getConnection("jdbc:linkis://127.0.0.1:9001","username","password"); //3. 建立statement 和执行查询 Statement st= connection.createStatement(); ResultSet rs=st.executeQuery("show tables"); //4.处理数据库的返回结果(使用ResultSet类) while (rs.next()) { ResultSetMetaData metaData = rs.getMetaData(); for (int i = 1; i <= metaData.getColumnCount(); i++) { System.out.print(metaData.getColumnName(i) + ":" +metaData.getColumnTypeName(i)+": "+ rs.getObject(i) + " "); } System.out.println(); } //关闭资源 rs.close(); st.close(); connection.close();}
   


   
2-1 Linkis JDBC 任务执行结果


03typescript

数据库

模块设计方案微信


Linkis JDBC模块设计的初衷是为了方便用户经过JDBC的方式,便捷的提交SQL任务到Linkis中执行,是客户端轻量化追求的一种体现,该模块的类大多以UJESSQL开头,表示JDBC模块属于linkis的ujes(Unified Job Execution Service,统一任务执行服务)模块的一部分。

Linkis的JDBC模块包含了五个关键的实现类:
  • UJESSQLDriver

  • UJESSQLConnection

  • UJESSQLStatement

  • UJESSQLPreStatement

  • UJESSQLResultSet

以及许多额外的辅助类,例如数据库元数据UJESSQLDatabaseMetaData,任务执行返回的结果集元数据UJESSQLResultMetaData等。

当UJESSQLDriver经过反射机制注册到DriverManager后,经过DM能够拿到UJESSQLConnection,接着即可以正常进行SQL任务提交和获取结果集,下面是结果集获取时的方法调用时序图:


下面将逐一介绍JDBC关键类在Linkis中的实现方案。

(1)驱动类UJESSQLDriver
在 JDBC的层次上,sun主要定义了一个接口Driver和两个类:DirverManager和DriverInfo,每一个JDBC驱动程序必须实现 Driver接口,例如MySql的Connector/J驱动中,叫作com.mysql.jdbc.Driver,在Linkis的JDBC中的驱动实现类为UJESSQLDriver。使用时经过Class.forName("com.webank.wedatasphere.linkis.ujes.jdbc.UJESSQLDriver")的方法显示地让JVM尝试加载类,并相应的调用静态代码块完成驱动类的注册。UJESSQLDriver的主要代码以下:
static { try { DriverManager.registerDriver(new UJESSQLDriver()); } catch (SQLException e) { Logger logger = LoggerFactory.getLogger(UJESSQLDriver.class); logger.info("Load driver failed",e); }}
经过调用DriverManager的注册方法registerDriver将该驱动类注册到DriverManager中,当用户调用DriverManager的getConnection时,DriverManager会检索全部已经注册的驱动类,并根据驱动类的类名和请求URL中的类名进行对比,寻找出对应的驱动类。

(2)JDBC链接器UJESSQLConnection

Linkis JDBC中链接器为UJESSQLConnection,实现了java.sql.connection接口。注册驱动以后,能够经过传入指定的数据库链接路径,用户名和密码即可获取数据库链接对象。
conn = (UJESSQLConnection) DriverManager  .getConnection("jdbc:linkis://hostname:port","username","password")
DriverManager 的getConnection方法将传入的参数进行处理和转换,调用Driver的connect方法,再将参数传入UJESSQLConnection的构造器中初始化,返回给用户。下面是UJESSQLDriver实现的的connect方法:
override def connect(url: String, info: Properties): Connection = if(acceptsURL(url)) { val props = if(info != null) info else newProperties props.putAll(parseURL(url)) val ujesClient =UJESClientFactory.getUJESClient(props) new UJESSQLConnection(ujesClient, props)} else throw newUJESSQLException(UJESSQLErrorCode.BAD_URL, "bad url: " + url)
private def parseURL(url: String): Properties= { val props = new Properties //add an entry to get url props.setProperty("URL", url) url match { case URL_REGEX(host, port, db, params)=> if(StringUtils.isNotBlank(host))props.setProperty(HOST, host) if(StringUtils.isNotBlank(port))props.setProperty(PORT, port.substring(1)) if(StringUtils.isNotBlank(db)&& db.length > 1) props.setProperty(DB_NAME, db.substring(1)) if(StringUtils.isNotBlank(params)&& params.length > 1) { val _params = params.substring(1) val kvs =_params.split(PARAM_SPLIT).map(_.split(KV_SPLIT)).filter { case Array(USER, value) => props.setProperty(USER, value) false case Array(PASSWORD, value) => props.setProperty(PASSWORD,value) false case Array(key, _) => if(StringUtils.isBlank(key)) { throw newUJESSQLException(UJESSQLErrorCode.BAD_URL, "bad url for params: " +url) } else true case _ => throw newUJESSQLException(UJESSQLErrorCode.BAD_URL, "bad url for params: " +url) } props.setProperty(PARAMS,kvs.map(_.mkString(KV_SPLIT)).mkString(PARAM_SPLIT)) } case _ => throw newUJESSQLException(UJESSQLErrorCode.BAD_URL, "bad url: " + url) } props}

在构造好链接参数以后,会调用UJESClientFactory.getUJESClient(prop)方法建立一个新的ujesclient的linkis访问客户端,用于提交和查询linkis任务。
  
def getUJESClient(props: Properties):UJESClient = { val host = props.getProperty(HOST) val port = props.getProperty(PORT) val serverUrl = if(StringUtils.isNotBlank(port))s"http://$host:$port" else "http://" + host if(ujesClients.containsKey(serverUrl))ujesClients.get(serverUrl) else serverUrl.intern synchronized { if(ujesClients.containsKey(serverUrl))return ujesClients.get(serverUrl) val ujesClient =createUJESClient(serverUrl, props) ujesClients.put(serverUrl, ujesClient) ujesClient }}
将建立好的ujesclient和数据库参数props传入UJESSQLConnection的构造器,最终获得一个完整的UJESSQLConnection对象。

(3)执行对象UJESSQLStatement/UJESSQLPreStatement

执行对象在整个JDBC链接和使用的生命周期中,属于请求保存者和执行者的身份,每一个JDBC链接器在生成以后,能够经过调用链接器Connection的createStatement方法获取执行对象,相似地,也能够经过Connection的prepareStatement方法获取预执行对象。
     
//获取执行对象UJESSQLStatementstatement = (UJESSQLStatementCon) conn.createStatement;//获取预执行对象UJESSQLPrepareStatementpreStatement = (UJESSQLPrepareStatement) conn.prePareStatement;
UJESSQLStatement中最为重要的方法execute做为提交SQL任务执行的入口,任务提交的执行主流程以下:
Step1 调用hook修改sql。
Step2 生成用于提交linkis任务的action。
Step3 利用ujes客户端提交job到linkis执行。
Step4 检测job的状态翻转。
Step5 获取结果集ResultSet。

UJESSQLStatement中提交任务执行的Execute方法的代码:
override defexecute(sql: String): Boolean = throwWhenClosed { var parsedSQL = sql //预执行hook,转换不支持的sql语句 JDBCDriverPreExecutionHook.getPreExecutionHooks.foreach{ preExecution => parsedSQL = preExecution.callPreExecutionHook(parsedSQL) } //获取linkis的job执行器,建立用于执行的action任务 val action =JobExecuteAction.builder().setEngineType(EngineType.SPARK).addExecuteCode(parsedSQL) .setCreator(ujesSQLConnection.creator).setUser(ujesSQLConnection.user) if(ujesSQLConnection.variableMap.nonEmpty)action.setVariableMap(JavaConversions.mapAsJavaMap(ujesSQLConnection.variableMap)) //提交SQL任务到ujes客户端执行 jobExecuteResult =ujesSQLConnection.ujesClient.execute(action.build()) queryEnd = false //job状态检测 var status =ujesSQLConnection.ujesClient.status(jobExecuteResult) val atMost = if(queryTimeout > 0) Duration(queryTimeout,TimeUnit.MILLISECONDS) else Duration.Inf if(!status.isCompleted)Utils.tryThrow{ Utils.waitUntil(() =>{ status =ujesSQLConnection.ujesClient.status(jobExecuteResult) status.isCompleted ||closed }, atMost, 100, 10000) } { case t: TimeoutException=> if(queryTimeout >0) clearQuery() newUJESSQLException(UJESSQLErrorCode.QUERY_TIMEOUT, "query has beentimeout!").initCause(t) case t => t } if(!closed) { var jobInfo =ujesSQLConnection.ujesClient.getJobInfo(jobExecuteResult) if(status.isFailed)throw new ErrorException(jobInfo.getRequestPersistTask.getErrCode,jobInfo.getRequestPersistTask.getErrDesc) val jobInfoStatus =jobInfo.getJobStatus if(!jobInfoStatus.equals("Succeed")) Utils.tryThrow{ Utils.waitUntil(()=> { jobInfo = ujesSQLConnection.ujesClient.getJobInfo(jobExecuteResult) val state =jobInfo.getJobStatus match{ case"Failed" | "Cancelled" | "Timeout" |"Succeed" => true case _ => false } state || closed }, atMost, 100, 10000) } { case t:TimeoutException => if(queryTimeout >0) clearQuery() newUJESSQLException(UJESSQLErrorCode.QUERY_TIMEOUT, "query has beentimeout!").initCause(t) case t => t } //获取结果集 val resultSetList =jobInfo.getResultSetList(ujesSQLConnection.ujesClient) queryEnd = true if(resultSetList !=null) { resultSet = newUJESSQLResultSet(resultSetList, this, maxRows, fetchSize) true } else false } else throw newUJESSQLException(UJESSQLErrorCode.STATEMENT_CLOSED, "Statement isclosed.")}
一样的UJESPrepareStatement中的excute方法继承自Statement,原理一致。

UJESPrepareStatement与UJESStatement的不一样之处在于,statement每次执行sql语句,相关数据库都要执行sql语句的编译,preparedstatement是预编译的, 且支持批处理。

(4)结果集UJESSQLResultSet
当用户提交完SQL任务到Linkis后,会检测用户的job是否已经完成,完成时调用getResultSet的方法获取结果集UJESSQLResultSet。

在UJESSQLResultSet初次加载的时候,java虚拟机会调用初始化的init()方法,该方法会执行三个初始化的步骤,用于构建结果集:
Step1:经过resultSetResultInit方法设置获取结果集相关的参数,如当前用户和结果集路径,而后经过ujesClient拿到结果集。
Step2:经过metaDataInit方法获取结果集的元数据。
Step3:经过resultSetInit方法获取结果集的内容。
private def init(): Unit = { resultSetResultInit() metaDataInit() resultSetInit()} private def resultSetResultInit(): Unit = { if (path == null) path =getResultSetPath(resultSetList) val user =connection.getProps.getProperty("user") if(StringUtils.isNotBlank(path)){ val resultAction =ResultSetAction.builder.setUser(user).setPath(path).build() resultSetResult =connection.ujesClient.resultSet(resultAction) }} private def metaDataInit(): Unit = { if ( null ==resultSetResult ){ return } metaData =resultSetResult.getMetadata.asInstanceOf[util.List[util.Map[String, String]]] for(cursor <- 1 tometaData.size()){ val col =metaData.get(cursor - 1) resultSetMetaData.setColumnNameProperties(cursor,col.get("columnName")) resultSetMetaData.setDataTypeProperties(cursor,col.get("dataType")) resultSetMetaData.setCommentPropreties(cursor,col.get("comment")) }} private def resultSetInit(): Unit = { if ( null ==resultSetResult ){ return } resultSetRow =resultSetResult.getFileContent.asInstanceOf[util.ArrayList[util.ArrayList[String]]]}

UJESSQLResultSet中的next()方法将currentRowCursor做为移动游标。每次从结果集中读取数据后都会相应地更新游标的位置。若是next方法返回true,则能够调用getXXX()方法获取相关字段数据,反之则说明当前游标并未指向一条有效记录,读取过程直接结束。
override def next(): Boolean = { if(metaData == null)init() currentRowCursor += 1 if(null == resultSetRow ||currentRowCursor > resultSetRow.size()-1) false else{ updateCurrentRow(currentRowCursor) true }}

当next() 方法返回true时,会相应地调用getXXX()方法读取数据。以getString()方法为例:
override def getString(columnIndex: Int): String = { val any = getColumnValue(columnIndex) if(wasNull()) { throw newUJESSQLException(UJESSQLErrorCode.RESULTSET_ROWERROR, "Type is null") }else{ any.asInstanceOf[String] }}

该方法调用getColumnValue读取数据,并将其转化为String类型的值返回。

(5)错误码方案

为了便于用户利用Linkis JDBC提交SQL执行,减小错误排查时间,咱们在Linkis JDBC中对常见的错误生成了错误码,尽可能覆盖整个JDBC提交和执行过程。错误码编号范围初步设定在80000~80100,常见的错误类型包括:参数类型错误、方法暂不支持、操做逻辑出错以及返回类型错误等。
public enum UJESSQLErrorCode {  BAD_URL(80000,"badurl"), NOSUPPORT_DRIVER(80001,"this method not supported in driver"),  NOSUPPORT_CONNECTION(80002,"this method not supported in connection"), NOSUPPORT_STATEMENT(80003,"this method not supported instatement"), CONNECTION_CLOSED(80004,"Connection is closed!"), STATEMENT_CLOSED(80005,"statement is closed!"),  SCHEMA_EMPTY(80006,"schemais empty!"), SCHEMA_FAILED(80007,"Get schema failed!"), QUERY_TIMEOUT(80008,"query has been timeout!"), FILETYPE_ERROR(80009,"file type error"), METADATATYPE_ERROR(80010,"metadata type error"),  NOSUPPORT_METADATA(80011"thismethod not supported in DatabaseMetaData"), NOPERMITION(80012,"This user has no permission to read thisfile!"),  PARAMS_NOT_FOUND(80013,"Parameter not found"), ERRORINFO_FROM_JOBINFO(80014,"get errorinfo from jobInfo"),  RESULTSET_ROWERROR(80015,"rowmessage error"), NOSUPPORT_RESULTSET(80016,"this method not supported inresultSet"), RESULTSET_NULL(80017,"resultset is null,try to run next() firstlyto init ResultSet and MetaData"), PREPARESTATEMENT_TYPEERROR(80018,"parameter type error"), METADATA_EMPTY(80019,"data is empty") ; private String msg; private int code;
UJESSQLErrorCode(intcode,String msg) { this.code = code; this.msg = msg; }
public String getMsg() { return msg; }
public int getCode() { return code; }}


04

实现方案总结


    Linkis JDBC模块设计的初衷是为了用户可以方便的经过JDBC的方式提交SQL任务到LInkis执行,在实现的过程当中,咱们参考了文章[1]进行初步的框架设计,实现过程当中对于任务的提交和封装参考了Linkis ujes中与job相关的文档,阅读了一些JDBC相关的文章[2][3]。设计的过程当中仍有一部分非必要的接口没有实现,这是参考Kylin、Hive等项目中JDBC模块设计综合考量后的结果,在不影响使用效果的前提降低低开发成本。

    Linkis自己做为大数据产品的链接器,具备强大的集成和可拓展性,JDBC模块也是Linkis的向外兼容的一个具体实现,期待服务于社区一岁多的Linkis可以茁壮成长,在你们的共同栽培下枝繁叶茂。

     

    05

    参考文献


    [1] create-your-own-type-3-jdbc-driver
    https://www.javaworld.com/article/2074249/create-your-own-type-3-jdbc-driver--part-1.html
    [2] Java JDBC的优雅设计
    https://blog.csdn.net/yisizhu/article/details/104025220
    [3] Class.forName加载JDBC驱动程序时,底层都作了些什么???
    https://www.cnblogs.com/liuxianan/archive/2012/08/04/2623258.html

    WeDataSphere,BIG DATA MADE EASY.

    用心作一个有温度的开源社区

    ~欢迎关注~


    扫码关注咱们

    微信号公众号 : WeDataSphere

    GitHub:WeDataSphere

    若是喜欢咱们的产品或文章,请给咱们的GitHub点上你宝贵的star和fork哦~~


    本文分享自微信公众号 - WeDataSphere(gh_273e85fce73b)。
    若有侵权,请联系 support@oschina.cn 删除。
    本文参与“OSC源创计划”,欢迎正在阅读的你也加入,一块儿分享。

    相关文章
    相关标签/搜索