Kylin 用户在使用 Spark的过程当中,常常会遇到任务提交缓慢、构建节点不稳定的问题。为了更方便地向 Spark 提交、管理和监控任务,有些用户会使用 Livy 做为 Spark 的交互接口。在最新的 Apache Kylin 3.0 版本中,Kylin 加入了经过 Apache Livy 递交 Spark 任务的新功能[KYLIN-3795],特此感谢滴滴靳国卫同窗对此功能的贡献。git
Apache Livy 是一个基于 Spark 的开源 REST 服务,是 Apache 基金会的一个孵化项目,它可以经过 REST 的方式将代码片断或是序列化的二进制代码提交到 Spark 集群中去执行。它提供了以下基本功能:github
Apache Livy 架构sql
1. 当前 Spark 存在的问题shell
Spark 当前支持两种交互方式:apache
两种方式都须要用户登陆到 Gateway 节点上经过脚本启动 Spark 进程,可是会出现如下问题:服务器
2. Livy 优点session
一方面,接受并解析用户的 REST 请求,转换成相应的操做;另外一方面,它管理着用户所启动的全部的 Spark 集群。架构
Livy 具备以下功能:ide
1. 引入 Livy 以前 Kylin 是如何使用 Spark 的工具
Spark 是在 Kylin v2.0 引入的,主要应用于 Cube 构建,构建过程介绍能够查看:https://kylin.apache.org/blog/2017/02/23/by-layer-spark-cubing/
下面是 SparkExecutable 类的 doWork 方法关于提交 Spark job 的一段代码,咱们能够看到 Kylin 会从配置中获取 Spark job 包的路径(默认为 $KYLIN_HOME/lib),经过本地指令的形式提交 Spark job,而后循环获取 Spark job 的执行状态和结果。咱们能够看到 Kylin 单独开了一个线程在本地向 Spark 客户端发送来 job 请求而且循环获取结果,额外增长了节点系统压力。
@Override protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException { //略... String jobJar = config.getKylinJobJarPath(); //获取job jar的路径 //略... final String cmd = String.format(Locale.ROOT, stringBuilder.toString(), hadoopConf,KylinConfig.getSparkHome(), jars, jobJar, formatArgs()); //构建本地command //略... //建立指令执行线程 Callable callable = new Callable<Pair<Integer, String>>() { @Override public Pair<Integer, String> call() throws Exception { Pair<Integer, String> result; try { result = exec.execute(cmd, patternedLogger); } catch (Exception e) { logger.error("error run spark job:", e); result = new Pair<>(-1, e.getMessage()); } return result; } }; //略... try { Future<Pair<Integer, String>> future = executorService.submit(callable); Pair<Integer, String> result = null; while (!isDiscarded() && !isPaused()) { if (future.isDone()) { result = future.get(); //循环获取指令执行结果 break; } else { Thread.sleep(5000); //每隔5秒检查一次job执行状态 } } //略... } catch (Exception e) { logger.error("Error run spark job:", e); return ExecuteResult.createError(e); } //略... }
2. Livy for Kylin 详细解析
Livy 向 Spark 提交 job 一共有两种,分别是 Session 和 Batch,Kylin 是经过 Batch 的方式提交 job 的,须要提早构建好 Spark job 对应的 jar 包并上传到 HDFS 中,而且将配置项 kylin.engine.livy-conf.livy-key.file=hdfs:///path-to-kylin-job-jar 加入到 kyiln.properties 中。
Batch 一共具备以下九种状态:
public enum LivyStateEnum { starting, running, success, dead, error, not_started, idle, busy, shutting_down; }
下面是 SparkExecutableLivy 类的 doWork 方法和 LivyRestExecutor 类的 execute 方法关于提交 Spark job 的一段代码,Kylin 经过 livyRestBuilder 读取配置文件获取 Spark job 的包路径,而后经过 restClient 向 Livy 发送 Http 请求。在提交 job 以后会每隔 10 秒查询一次 job 执行的结果,直到 job 的状态变为 shutting_down, error, dead, success 中的一种。每一次都是经过 Http 的方式发送请求,相比较于经过本地 Spark 客户端提交任务,更加稳定并且减小了 Kylin 节点系统压力。
@Override protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException { //略... livyRestBuilder.setLivyTypeEnum(LivyTypeEnum.job); executor.execute(livyRestBuilder, patternedLogger); //调用LivyRestExecutor类的execute方法 if (isDiscarded()) { return new ExecuteResult(ExecuteResult.State.DISCARDED, "Discarded"); } if (isPaused()) { return new ExecuteResult(ExecuteResult.State.STOPPED, "Stopped"); } //略... } public void execute(LivyRestBuilder livyRestBuilder, Logger logAppender) { LivyRestClient restClient = new LivyRestClient(); String result = restClient.livySubmitJobBatches(dataJson); //向Livy发送http请求 JSONObject resultJson = new JSONObject(result); String state = resultJson.getString("state"); //获得Livy请求结果 final String livyTaskId = resultJson.getString("id"); while (!LivyStateEnum.shutting_down.toString().equalsIgnoreCase(state) && !LivyStateEnum.error.toString().equalsIgnoreCase(state) && !LivyStateEnum.dead.toString().equalsIgnoreCase(state) && !LivyStateEnum.success.toString().equalsIgnoreCase(state)) { String statusResult = restClient.livyGetJobStatusBatches(livyTaskId); //获取Spark job执行状态 JSONObject stateJson = new JSONObject(statusResult); if (!state.equalsIgnoreCase(stateJson.getString("state"))) { logAppender.log("Livy status Result: " + stateJson.getString("state")); } state = stateJson.getString("state"); Thread.sleep(10*1000); //每10秒检查一次结果 } }
3. Livy 在 Kylin 中的应用
构建 Intermediate Flat Hive Table 和 Redistribute Flat Hive Table 本来都是经过 Hive 客户端(Cli 或 Beeline)进行构建的,引入 Livy 以后,Kylin 经过 Livy 来调用 SparkSQL 进行构建,提升了平表的构建速度。在引入 Livy 以后,Cube 的构建主要改变的是如下几个步骤,对应的任务日志输出以下:
1)构建 Cube
2)转换 Cuboid 为 HFile
4. 引入 Livy 对 Kylin 的好处
5. 如何在 Kylin 中启用 Livy
在 Kylin 启用 Livy 前,请先确保 Livy 可以正常工做
1)在 Kylin.properties 中,加入以下配置,并重启使之生效。
//此处为CDH5.7环境下的配置 kylin.engine.livy-conf.livy-enabled=true kylin.engine.livy-conf.livy-url=http://cdh-client:8998 kylin.engine.livy-conf.livy-key.file=hdfs:///path/kylin-job-3.0.0-SNAPSHOT.jar //请根据我的环境替换对应版本的包 kylin.engine.livy-conf.livy-arr.jars=hdfs:///path/hbase-client-1.2.0-cdh5.7.5.jar,hdfs:///path/hbase-common-1.2.0-cdh5.7.5.jar,hdfs:///path/hbase-hadoop-compat-1.2.0-cdh5.7.5.jar,hdfs:///path/hbase-hadoop2-compat-1.2.0-cdh5.7.5.jar,hdfs:///path/hbase-server-1.2.0-cdh5.7.5.jar,hdfs:///path/htrace-core-3.2.0-incubating.jar,hdfs:///path/metrics-core-2.2.0.jar
其中 livy-key.file 和 livy-arr.jars 地址之间不要有空格,不然可能会出不可预知的错误。
2)Cube 构建引擎选用 Spark。
如下问题每每为使用不当和配置错误的缘由,非 Kylin 自己存在的问题,此处仅为友情提示。
1. Table or view not found
输出日志:
Exception in thread "main" org.apache.spark.sql.AnalysisException: Table or view not found: `DEFAULT`.`KYLIN_SALES`; line 21 pos 6;
解决方法:
//将hive-site.xml拷贝到spark的配置文件目录中 ln -s /etc/hive/conf/hive-site.xml $SPARK_CONF_DIR
2. livy request 400 error
解决方法:
//kylin.properties Livy配置项jar包地址之间不要留空格 //此处为CDH5.7环境下的依赖包,请根据我的环境替换对应版本的包 kylin.engine.livy-conf.livy-arr.jars=hdfs:///path/hbase-client-1.2.0-cdh5.7.5.jar,hdfs:///path/hbase-common-1.2.0-cdh5.7.5.jar,hdfs:///path/hbase-hadoop-compat-1.2.0-cdh5.7.5.jar,hdfs:///path/hbase-hadoop2-compat-1.2.0-cdh5.7.5.jar,hdfs:///path/
3. NoClassDefFoundError
输出日志:
NoClassDefFoundError: org/apache/hadoop/hbase/protobuf/generated/HFileProtos
解决方法:
find /opt -type f -name "hbase-protocol*.jar" cp /path/to/hbase-protocol-1.2.0-cdh5.7.5.jar $SPARK_HOME/jars
4. livy sql 执行错误
解决方法:
//kylin.properties中添加以下配置 kylin.source.hive.quote-enabled=false
Livy 本质上是在 Spark 上的 REST 服务,对于 Kylin cube 的构建没有本质上的性能提高,可是经过引入 Livy,Kylin 可以直接经过 Spark SQL 代替 Hive 构建 Flat Table,并且管理 Spark job 也更加方便。可是,Livy 当前也存在一些问题,好比使用较低或较高版本的 Spark 没法正常工做以及单点故障等问题,用户能够考虑自身的实际场景选择是否须要在 Kylin 中使用 Livy。
做者简介:王汝鹏,Kyligence 大数据研发工程师,主要负责 Apache Kylin 社区维护和开发。GitHub:https://github.com/rupengwang。