Flink 1.11 新特性之 SQL Hive Streaming 简单示例

7月7日,Flink 1.11 版本发布,与 1.10 版本相比,1.11 版本最为显著的一个改进是 Hive Integration 显著加强,也就是真正意义上实现了基于 Hive 的流批一体。php


本文用简单的本地示例来体验 Hive Streaming 的便利性并跟你们分享体验的过程以及个人心得,但愿对你们上手使用有所帮助。html


添加相关依赖web


测试集群上的 Hive 版本为 1.1.0,Hadoop 版本为 2.6.0,Kafka 版本为 1.0.1。

<properties> <scala.bin.version>2.11</scala.bin.version> <flink.version>1.11.0</flink.version> <flink-shaded-hadoop.version>2.6.5-10.0</flink-shaded-hadoop.version> <hive.version>1.1.0</hive.version></properties>
<dependencies> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-scala_${scala.bin.version}</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients_${scala.bin.version}</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-common</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-api-scala-bridge_${scala.bin.version}</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-planner-blink_${scala.bin.version}</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-hive_${scala.bin.version}</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-sql-connector-kafka_${scala.bin.version}</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-json</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-shaded-hadoop-2-uber</artifactId> <version>${flink-shaded-hadoop.version}</version> </dependency> <dependency> <groupId>org.apache.hive</groupId> <artifactId>hive-exec</artifactId> <version>${hive.version}</version> </dependency>

另外,别忘了找到 hdfs-site.xml 和 hive-site.xml,并将其加入项目。

建立执行环境sql


Flink 1.11 的 Table/SQL API 中,FileSystem Connector 是靠加强版 StreamingFileSink 组件实现,在源码中名为 StreamingFileWriter。咱们知道,只有在 Checkpoint 成功时,StreamingFileSink 写入的文件才会由 Pending 状态变成 Finished 状态,从而可以安全地被下游读取。因此,咱们必定要打开 Checkpointing,并设定合理的间隔。


val streamEnv = StreamExecutionEnvironment.getExecutionEnvironmentstreamEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)streamEnv.setParallelism(3)
val tableEnvSettings = EnvironmentSettings.newInstance() .useBlinkPlanner() .inStreamingMode() .build()val tableEnv = StreamTableEnvironment.create(streamEnv, tableEnvSettings)tableEnv.getConfig.getConfiguration.set(ExecutionCheckpointingOptions.CHECKPOINTING_MODE, CheckpointingMode.EXACTLY_ONCE)tableEnv.getConfig.getConfiguration.set(ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL, Duration.ofSeconds(20))
   

注册 HiveCatalogapache


val catalogName = "my_catalog"val catalog = new HiveCatalog( catalogName, // catalog name "default", // default database "/Users/lmagic/develop", // Hive config (hive-site.xml) directory "1.1.0" // Hive version)tableEnv.registerCatalog(catalogName, catalog)tableEnv.useCatalog(catalogName)


建立 Kafka 流表json


Kafka Topic 中存储的是 JSON 格式的埋点日志,建表时用计算列生成事件时间与水印。1.11 版本 SQL Kafka Connector 的参数相比 1.10 版本有必定简化。

tableEnv.executeSql("CREATE DATABASE IF NOT EXISTS stream_tmp")tableEnv.executeSql("DROP TABLE IF EXISTS stream_tmp.analytics_access_log_kafka")
tableEnv.executeSql( """ |CREATE TABLE stream_tmp.analytics_access_log_kafka ( | ts BIGINT, | userId BIGINT, | eventType STRING, | fromType STRING, | columnType STRING, | siteId BIGINT, | grouponId BIGINT, | partnerId BIGINT, | merchandiseId BIGINT, | procTime AS PROCTIME(), | eventTime AS TO_TIMESTAMP(FROM_UNIXTIME(ts / 1000,'yyyy-MM-dd HH:mm:ss')), | WATERMARK FOR eventTime AS eventTime - INTERVAL '15' SECOND |) WITH ( | 'connector' = 'kafka', | 'topic' = 'ods_analytics_access_log', | 'properties.bootstrap.servers' = 'kafka110:9092,kafka111:9092,kafka112:9092' | 'properties.group.id' = 'flink_hive_integration_exp_1', | 'scan.startup.mode' = 'latest-offset', | 'format' = 'json', | 'json.fail-on-missing-field' = 'false', | 'json.ignore-parse-errors' = 'true' |) """.stripMargin)

前面已经注册了 HiveCatalog,故在 Hive 中能够观察到建立的 Kafka 流表的元数据(注意该表并无事实上的列)。

hive> DESCRIBE FORMATTED stream_tmp.analytics_access_log_kafka;OK# col_name data_type comment

# Detailed Table InformationDatabase: stream_tmpOwner: nullCreateTime: Wed Jul 15 18:25:09 CST 2020LastAccessTime: UNKNOWNProtect Mode: NoneRetention: 0Location: hdfs://sht-bdmq-cls/user/hive/warehouse/stream_tmp.db/analytics_access_log_kafkaTable Type: MANAGED_TABLETable Parameters: flink.connector kafka flink.format json flink.json.fail-on-missing-field false flink.json.ignore-parse-errors true flink.properties.bootstrap.servers kafka110:9092,kafka111:9092,kafka112:9092 flink.properties.group.id flink_hive_integration_exp_1 flink.scan.startup.mode latest-offset flink.schema.0.data-type BIGINT flink.schema.0.name ts flink.schema.1.data-type BIGINT flink.schema.1.name userId flink.schema.10.data-type TIMESTAMP(3) flink.schema.10.expr TO_TIMESTAMP(FROM_UNIXTIME(`ts` / 1000, 'yyyy-MM-dd HH:mm:ss')) flink.schema.10.name eventTime flink.schema.2.data-type VARCHAR(2147483647) flink.schema.2.name eventType # 略...... flink.schema.9.data-type TIMESTAMP(3) NOT NULL flink.schema.9.expr PROCTIME() flink.schema.9.name procTime flink.schema.watermark.0.rowtime eventTime flink.schema.watermark.0.strategy.data-type TIMESTAMP(3) flink.schema.watermark.0.strategy.expr `eventTime` - INTERVAL '15' SECOND flink.topic ods_analytics_access_log is_generic true transient_lastDdlTime 1594808709
# Storage InformationSerDe Library: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDeInputFormat: org.apache.hadoop.mapred.TextInputFormatOutputFormat: org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormatCompressed: NoNum Buckets: -1Bucket Columns: []Sort Columns: []Storage Desc Params: serialization.format 1Time taken: 1.797 seconds, Fetched: 61 row(s)


建立 Hive 表bootstrap


Flink SQL 提供了兼容 HiveQL 风格的 DDL,指定 SqlDialect.HIVE 便可( DML 兼容还在开发中)。

为了方便观察结果,如下的表采用了天/小时/分钟的三级分区,实际应用中能够不用这样细的粒度(10分钟甚至1小时的分区可能更合适)。

tableEnv.getConfig.setSqlDialect(SqlDialect.HIVE)
tableEnv.executeSql("CREATE DATABASE IF NOT EXISTS hive_tmp")tableEnv.executeSql("DROP TABLE IF EXISTS hive_tmp.analytics_access_log_hive")
tableEnv.executeSql( """ |CREATE TABLE hive_tmp.analytics_access_log_hive ( | ts BIGINT, | user_id BIGINT, | event_type STRING, | from_type STRING, | column_type STRING, | site_id BIGINT, | groupon_id BIGINT, | partner_id BIGINT, | merchandise_id BIGINT |) PARTITIONED BY ( | ts_date STRING, | ts_hour STRING, | ts_minute STRING |) STORED AS PARQUET |TBLPROPERTIES ( | 'sink.partition-commit.trigger' = 'partition-time', | 'sink.partition-commit.delay' = '1 min', | 'sink.partition-commit.policy.kind' = 'metastore,success-file', | 'partition.time-extractor.timestamp-pattern' = '$ts_date $ts_hour:$ts_minute:00' |) """.stripMargin)

Hive 表的参数复用了 SQL FileSystem Connector 的相关参数,与分区提交(Partition Commit)密切相关。仅就上面出现的4个参数简单解释一下。

  • sink.partition-commit.trigger :触发分区提交的时间特征。默认为 processing-time,即处理时间,很显然在有延迟的状况下,可能会形成数据分区错乱。因此这里使用 partition-time,即按照分区时间戳(即分区内数据对应的事件时间)来提交。
  • partition.time-extractor.timestamp-pattern :分区时间戳的抽取格式。须要写成 yyyy-MM-dd HH:mm:ss 的形式,并用 Hive 表中相应的分区字段作占位符替换。显然,Hive 表的分区字段值来自流表中定义好的事件时间,后面会看到。
  • sink.partition-commit.delay :触发分区提交的延迟。在时间特征设为 partition-time 的状况下,当水印时间戳大于分区建立时间加上此延迟时,分区才会真正提交。此值最好与分区粒度相同,例如若 Hive 表按1小时分区,此参数可设为 1 h,若按 10 分钟分区,可设为 10 min。
  • sink.partition-commit.policy.kind :分区提交策略,能够理解为使分区对下游可见的附加操做。 metastore 表示更新 Hive Metastore 中的表元数据, success-file 则表示在分区内建立 _SUCCESS 标记文件。

固然,SQL FileSystem Connector 的功能并不限于此,还有很大自定义的空间(如能够自定义分区提交策略以合并小文件等)。具体可参见官方文档。

https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/filesystem.html#streaming-sinkapi


流式写入 Hive安全


注意将流表中的事件时间转化为 Hive 的分区。

tableEnv.getConfig.setSqlDialect(SqlDialect.DEFAULT)tableEnv.executeSql( """ |INSERT INTO hive_tmp.analytics_access_log_hive |SELECT | ts,userId,eventType,fromType,columnType,siteId,grouponId,partnerId,merchandiseId, | DATE_FORMAT(eventTime,'yyyy-MM-dd'), | DATE_FORMAT(eventTime,'HH'), | DATE_FORMAT(eventTime,'mm') |FROM stream_tmp.analytics_access_log_kafka |WHERE merchandiseId > 0 """.stripMargin)

来观察一下流式 Sink 的结果吧。


上文设定的 Checkpoint Interval 是 20 秒,能够看到,上图中的数据文件刚好是以 20 秒的间隔写入的。因为并行度为 3,因此每次写入会生成 3 个文件。分区内全部数据写入完毕后,会同时生成 _SUCCESS 文件。若是是正在写入的分区,则会看到 .inprogress 文件。

经过 Hive 查询一下,肯定数据的时间无误。

  
    
  
  
   
   
            
   
   

  
    
  
  
   
   
            
   
   

hive> SELECT from_unixtime(min(cast(ts / 1000 AS BIGINT))),from_unixtime(max(cast(ts / 1000 AS BIGINT))) > FROM hive_tmp.analytics_access_log_hive > WHERE ts_date = '2020-07-15' AND ts_hour = '23' AND ts_minute = '23';OK2020-07-15 23:23:00 2020-07-15 23:23:59Time taken: 1.115 seconds, Fetched: 1 row(s)

流式读取 Hive微信


要将 Hive 表做为流式 Source,须要启用 Dynamic Table Options,并经过 Table Hints 来指定 Hive 数据流的参数。如下是简单地经过 Hive 计算商品 PV 的例子。

  
    
  
  
   
   
            
   
   

  
    
  
  
   
   
            
   
   

tableEnv.getConfig.getConfiguration.setBoolean(TableConfigOptions.TABLE_DYNAMIC_TABLE_OPTIONS_ENABLED, true)
val result = tableEnv.sqlQuery( """ |SELECT merchandise_id,count(1) AS pv |FROM hive_tmp.analytics_access_log_hive |/*+ OPTIONS( | 'streaming-source.enable' = 'true', | 'streaming-source.monitor-interval' = '1 min', | 'streaming-source.consume-start-offset' = '2020-07-15 23:30:00' |) */ |WHERE event_type = 'shtOpenGoodsDetail' |AND ts_date >= '2020-07-15' |GROUP BY merchandise_id |ORDER BY pv DESC LIMIT 10 """.stripMargin)
result.toRetractStream[Row].print().setParallelism(1)streamEnv.execute()
三个 Table Hint 参数的含义解释以下。

  • streaming-source.enable:设为 true,表示该 Hive 表能够做为 Source。
  • streaming-source.monitor-interval:感知 Hive 表新增数据的周期,以上设为 1 分钟。对于分区表而言,则是监控新分区的生成,以增量读取数据。
  • streaming-source.consume-start-offset:开始消费的时间戳,一样须要写成 yyyy-MM-dd HH:mm:ss 的形式。

更加具体的说明仍然可参见官方文档。

https://links.jianshu.com/go?to=https%3A%2F%2Fci.apache.org%2Fprojects%2Fflink%2Fflink-docs-release-1.11%2Fdev%2Ftable%2Fhive%2Fhive_streaming.html%23streaming-reading


最后,因为 SQL 语句中有 ORDER BY 和 LIMIT 逻辑,因此须要调用 toRetractStream() 方法转化为回撤流,便可输出结果。

The End


Flink 1.11 的 Hive Streaming 功能大大提升了 Hive 数仓的实时性,对 ETL 做业很是有利,同时还可以知足流式持续查询的需求,具备必定的灵活性。

感兴趣的同窗也能够本身上手测试。

原文连接:
https://www.jianshu.com/p/fb7d29abfa14



  一个实践机会  

Apache Flink 极客挑战赛


万众瞩目的第二届 Apache Flink 极客挑战赛来啦!本次大赛全面升级,重量级助阵嘉宾专业指导,强大的资源配置供你发挥创意,还有 30w 丰厚奖金等你带走~聚焦  Flink 与 AI 技术的应用实践,挑战疫情防控的世界级难题,你准备好了么?

(点击图片可了解更多大赛信息)

点击「 阅读 原文 」便可报名

本文分享自微信公众号 - Flink 中文社区(gh_5efd76d10a8d)。
若有侵权,请联系 support@oschina.cn 删除。
本文参与“OSC源创计划”,欢迎正在阅读的你也加入,一块儿分享。

相关文章
相关标签/搜索