Apache Flink 从 1.9.0 版本开始增长了与 Hive 集成的功能,用户能够经过 Flink 来访问 Hive 的元数据,以及读写 Hive 中的表。本文将主要从项目的设计架构、最新进展、使用说明等方面来介绍这一功能。sql
SQL 是大数据领域中的重要应用场景,为了完善 Flink 的生态,发掘 Flink 在批处理方面的潜力,咱们决定加强 FlinkSQL 的功能,从而让用户可以经过 Flink 完成更多的任务。数据库
Hive 是大数据领域最先出现的 SQL 引擎,发展至今有着丰富的功能和普遍的用户基础。以后出现的 SQL 引擎,如 Spark SQL、Impala 等,都在必定程度上提供了与 Hive 集成的功能,从而方便用户使用现有的数据仓库、进行做业迁移等。所以咱们认为提供与 Hive 交互的能力对于 FlinkSQL 也是很是重要的。架构
与 Hive 集成主要包含了元数据和实际表数据的访问,所以咱们会从这两方面介绍一下该项目的架构。oop
为了访问外部系统的元数据,Flink 提供了 ExternalCatalog 的概念。可是目前 ExternalCatalog 的定义很是不完整,基本处于不可用的状态。所以,咱们提出了一套全新的 Catalog 接口来取代现有的 ExternalCatalog。新的 Catalog 可以支持数据库、表、分区等多种元数据对象;容许在一个用户 Session 中维护多个 Catalog 实例,从而同时访问多个外部系统;而且 Catalog 以可插拔的方式接入 Flink,容许用户提供自定义的实现。下图展现了新的 Catalog API 的整体架构。性能
建立 TableEnvironment 的时候会同时建立一个 CatalogManager,负责管理不一样的 Catalog 实例。TableEnvironment 经过 Catalog 来为 Table API 和 SQL Client 用户提供元数据服务。测试
目前 Catalog 有两个实现,GenericInMemoryCatalog 和 HiveCatalog。其中 GenericInMemoryCatalog 保持了原有的 Flink 元数据管理机制,将全部元数据保存在内存中。而 HiveCatalog 会与一个 Hive Metastore 的实例链接,提供元数据持久化的能力。要使用 Flink 与 Hive 进行交互,用户须要配置一个 HiveCatalog,并经过 HiveCatalog 访问 Hive 中的元数据。另外一方面,HiveCatalog 也能够用来处理 Flink 自身的元数据,在这种场景下,HiveCatalog 仅将 Hive Metastore 做为持久化存储使用,写入 Hive Metastore 中的元数据并不必定是 Hive 所支持的格式。一个 HiveCatalog 实例能够同时支持这两种模式,用户无需为管理 Hive 和 Flink 的元数据建立不一样的实例。大数据
另外,咱们设计了 HiveShim 来支持不一样版本的 Hive Metastore。目前支持的 Hive 版本包括 2.3.4 和 1.2.1。优化
咱们提供了 Hive Data Connector 来读写 Hive 的表数据。Hive Data Connector 尽量的复用了 Hive 自己的 Input/Output Format 和 SerDe 等类,这样作的好处一方面是减小了代码重复,更重要的是能够最大程度的保持与 Hive 的兼容,即 Flink 写入的数据 Hive 能够正常读取,而且反之亦然。ui
与 HiveCatalog 相似的,Hive Data Connector 目前支持的 Hive 版本也是 2.3.4 和 1.2.1。spa
Flink 与 Hive 集成的功能会在 1.9.0 版本中做为试用功能发布,用户能够经过 Table API 或者 SQL Client 的模式与 Hive 进行交互。下面列出的是在 1.9.0 中已经支持的功能:
因为是试用功能,所以还有一些方面不够完善,下面列出的是在 1.9.0 中缺失的功能:
部分数据类型不支持,包括Decimal、Char、Varchar、Date、Time、Timestamp、Interval、Union等。
使用 Flink 与 Hive 集成的功能,用户首先须要添加相应的依赖。若是是使用 SQL Client,则须要将依赖的 jar 添加到 Flink 的 lib 目录中;若是使用 Table API,则须要将相应的依赖添加到项目中(如pom.xml)。
如上文所述,目前支持的 Hive 版本包括 2.3.4 和 1.2.1,下表列出的是针对不一样版本所需的依赖。
其中 flink-shaded-hadoop-2-uber 包含了 Hive 对于 Hadoop 的依赖。若是不用 Flink 提供的包,用户也能够将集群中使用的 Hadoop 包添加进来,不过须要保证添加的 Hadoop 版本与 Hive 所依赖的版本是兼容的(Hive 2.3.4 依赖的 Hadoop 版本是 2.7.2;Hive 1.2.1 依赖的 Hadoop 版本是 2.6.0)。
依赖的 Hive 包(即 hive-exec 和 hive-metastore)也可使用用户集群中 Hive 所提供的 jar 包,详情请见支持不一样的 Hive 版本。
要与 Hive 交互,必须使用 HiveCatalog,下面介绍一下如何配置 HiveCatalog。
使用 SQL Client 时,用户须要在 sql-client-defaults.yaml 中指定本身所需的 Catalog,在 sql-client-defaults.yaml 的“catalogs”列表中能够指定一个或多个 Catalog 实例。如下的示例展现了如何指定一个 HiveCatalog:
catalogs: # A typical catalog definition looks like: - name: myhive type: hive hive-conf-dir: /path/to/hive_conf_dir hive-version: 2.3.4
其中 name 是用户给每一个 Catalog 实例指定的名字, Catalog 名字和 DB 名字构成了 FlinkSQL 中元数据的命名空间,所以须要保证每一个 Catalog 的名字是惟一的。type 表示 Catalog 的类型,对于 HiveCatalog 而言,type 应该指定为 hive。hive-conf-dir 用于读取 Hive 的配置文件,用户能够将其设定为集群中 Hive 的配置文件目录。hive-version 用于指定所使用的 Hive 版本,能够设定为 2.3.4 或者 1.2.1。
指定了 HiveCatalog 之后,用户就能够启动 sql-client,并经过如下命令验证 HiveCatalog 已经正确加载。
Flink SQL> show catalogs; default_catalog myhive Flink SQL> use catalog myhive;
其中 show catalogs 会列出加载的全部 Catalog 实例。须要注意的是,除了用户在sql-client-defaults.yaml 文件中配置的 Catalog 之外,FlinkSQL 还会自动加载一个 GenericInMemoryCatalog 实例做为内置的 Catalog,该内置 Catalog 默认名字为 default_catalog。
使用 use catalog 能够设定用户 Session 当前的 Catalog。用户在 SQL 语句中访问元数据对象(如 DB、Table 等)时,若是不指定 Catalog 名字,则 FlinkSQL 会在当前 Catalog 中进行查找。
下面的代码展现了如何经过 TableAPI 来建立 HiveCatalog,并注册到 TableEnvironment。
String name = "myhive"; String defaultDatabase = "default"; String hiveConfDir = "/path/to/hive_conf_dir"; String version = "2.3.4"; TableEnvironment tableEnv = …; // create TableEnvironment HiveCatalog hiveCatalog = new HiveCatalog(name, defaultDatabase, hiveConfDir, version); tableEnv.registerCatalog(name, hiveCatalog); tableEnv.useCatalog(name);
将 HiveCatalog 注册到 TableEnvironment 之后,就能够在经过 TableEnvironment 提交 SQL 的时候访问 HiveCatalog 中的元数据了。与 SQL Client 相似, TableEnvironment 也提供了 useCatalog 接口让用户设定当前 Catalog。
设置好 HiveCatalog 之后就能够经过 SQL Client 或者 Table API 来读写 Hive 中的表了。
假设 Hive 中已经有一张名为 src 的表,咱们能够用如下的 SQL 语句来读写这张表。
Flink SQL> describe src; root |-- key: STRING |-- value: STRING Flink SQL> select * from src; key value 100 val_100 298 val_298 9 val_9 341 val_341 498 val_498 146 val_146 458 val_458 362 val_362 186 val_186 …… …… Flink SQL> insert into src values ('newKey','newVal');
相似的,也能够经过 Table API 来读写上面提到的这张表。下面的代码展现了如何实现这一操做。
TableEnvironment tableEnv = …; // create TableEnvironment tableEnv.registerCatalog("myhive", hiveCatalog); // set myhive as current catalog tableEnv.useCatalog("myhive"); Table src = tableEnv.sqlQuery("select * from src"); // write src into a sink or do further analysis …… tableEnv.sqlUpdate("insert into src values ('newKey', 'newVal')"); tableEnv.execute("insert into src");
Flink 1.9.0 中支持的 Hive 版本是 2.3.4 和 1.2.1,目前咱们只针对这两个版本进行了测试。使用 SQL Client 时,若是用户没有在 sql-client-defaults.yaml 文件中指定 Hive 版本,咱们会自动检测 classpath 中的 Hive 版本。若是检测到的 Hive 版本不是 2.3.4 或 1.2.1 就会报错。
借助 Hive 兼容性的保证,其它不一样的小版本也比较多是能够正常工做的。所以,若是用户使用的 Hive 小版本与咱们所支持的不一样,能够指定一个支持的版原本试用与 Hive 集成的功能。好比用户使用的 Hive 版本是 2.3.3,能够在 sql-client-defaults.yaml 文件或者代码中将 Hive 版本指定为 2.3.4。
Flink 1.9.0 中 Hive 的 TableSink 只能在 batch 模式下工做,所以若是用户想要使用 Hive 的 TableSink,须要将执行模式设置为 batch。
Flink 1.9.0 增长了新的 blink planner,因为 blink planner 相比于原来的 planner 功能更加全面,所以咱们建议在使用 FlinkSQL 与 Hive 集成时使用 blink planner。后续新的功能也可能会只支持 blink planner。
使用 SQL Client 时能够像这样在 sql-client-defaults.yaml 中指定执行模式和 planner:
execution: # select the implementation responsible for planning table programs # possible values are 'old' (used by default) or 'blink' planner: blink # 'batch' or 'streaming' execution type: batch
对应的 Table API 的写法以下:
EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build(); TableEnvironment tableEnv = TableEnvironment.create(settings);
咱们会在 Flink 后续版本中进一步完善与 Hive 集成的功能,预计会在 1.10.0 版本中实现 Production-Ready。咱们在后续版本中计划开展的工做包括:
欢迎你们试用 Flink 1.9 中的 Hive 功能,若是遇到任何问题也欢迎你们经过钉钉、邮件列表等方式与咱们联系。
本文做者:巴蜀真人
本文为云栖社区原创内容,未经容许不得转载。